001 /*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License"). You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * docs/licenses/cddl.txt
011 * or http://www.opensource.org/licenses/cddl1.php.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * docs/licenses/cddl.txt. If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 * Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 * Copyright 2010-2013 UnboundID Corp.
026 */
027 package com.unboundid.directory.sdk.sync.scripting;
028
029 import java.io.Serializable;
030 import java.sql.SQLException;
031 import java.util.LinkedList;
032 import java.util.List;
033 import java.util.Iterator;
034 import java.util.concurrent.BlockingQueue;
035 import java.util.concurrent.atomic.AtomicLong;
036
037 import com.unboundid.ldap.sdk.Entry;
038 import com.unboundid.util.Extensible;
039 import com.unboundid.util.ThreadSafety;
040 import com.unboundid.util.ThreadSafetyLevel;
041 import com.unboundid.util.args.ArgumentException;
042 import com.unboundid.util.args.ArgumentParser;
043 import com.unboundid.directory.sdk.common.internal.Configurable;
044 import com.unboundid.directory.sdk.sync.config.JDBCSyncSourceConfig;
045 import com.unboundid.directory.sdk.sync.internal.SynchronizationServerExtension;
046 import com.unboundid.directory.sdk.sync.types.DatabaseChangeRecord;
047 import com.unboundid.directory.sdk.sync.types.SetStartpointOptions;
048 import com.unboundid.directory.sdk.sync.types.SyncOperation;
049 import com.unboundid.directory.sdk.sync.types.SyncServerContext;
050 import com.unboundid.directory.sdk.sync.types.TransactionContext;
051
052 /**
053 * This class defines an API that must be implemented by scripted extensions
054 * in order to synchronize data out of a relational database. Since the
055 * UnboundID Synchronization Server is LDAP-centric,
056 * this API allows you to take database content and convert it into LDAP
057 * entries which can then be processed by the Synchronization Server. The
058 * lifecycle of
059 * a sync operation is as follows:
060 * <ol>
061 * <li>Detect change at the synchronization source</li>
062 * <li>Fetch full source entry</li>
063 * <li>Perform any mappings and compute the equivalent destination entry</li>
064 * <li>Fetch full destination entry</li>
065 * <li>Diff the computed destination entry and actual (fetched) destination
066 * entry</li>
067 * <li>Apply the minimal set of changes at the destination to bring it in sync
068 * </li>
069 * </ol>
070 * This implies that the
071 * {@link #fetchEntry(TransactionContext, SyncOperation)} method will be
072 * called once for every change that is returned by
073 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}.
074 * <p>
075 * In several places a {@link TransactionContext} is provided, which allows
076 * controlled access to the target database. By default, methods in this class
077 * are always provided with a fresh connection (i.e. a new transaction), and the
078 * Synchronization Server will always commit or rollback the transaction
079 * automatically, depending on whether the method returned normally or threw an
080 * exception. Implementers may optionally perform their own transaction
081 * management within these methods if necessary.
082 * <p>
083 * Several of these methods throw {@link SQLException}, which should be used in
084 * the case of any database access error. For other types of errors, runtime
085 * exceptions may be used (IllegalStateException, NullPointerException, etc.).
086 * The Synchronization Server will automatically retry operations that fail, up
087 * to a configurable amount of attempts. The exception to this rule is if a
088 * SQLException is thrown with a SQL state string beginning with "08"; this
089 * indicates a connection error, and in this case the operation is retried
090 * indefinitely.
091 * <BR>
092 * <H2>Configuring Groovy-Scripted JDBC Sync Sources</H2>
093 * In order to configure a scripted JDBC sync source based on this API and
094 * written in the Groovy scripting language, use a command like:
095 * <PRE>
096 * dsconfig create-sync-source \
097 * --source-name "<I>{source-name}</I>" \
098 * --type groovy-scripted-jdbc \
099 * --set "server:{server-name}" \
100 * --set "script-class:<I>{class-name}</I>" \
101 * --set "script-argument:<I>{name=value}</I>"
102 * </PRE>
103 * where "<I>{source-name}</I>" is the name to use for the JDBC sync source
104 * instance, "<I>{server-name}</I>" is the name of the JDBC external server that
105 * will be used as the sync source, "<I>{class-name}</I>" is the fully-qualified
106 * name of the Groovy class written using this API, and "<I>{name=value}</I>"
107 * represents name-value pairs for any arguments to provide to the JDBC sync
108 * source. If multiple arguments should be provided to the JDBC sync source,
109 * then the "<CODE>--set script-argument:<I>{name=value}</I></CODE>" option
110 * should be provided multiple times.
111 */
112 @Extensible()
113 @SynchronizationServerExtension(appliesToLocalContent=false,
114 appliesToSynchronizedContent=true)
115 public abstract class ScriptedJDBCSyncSource implements Configurable
116 {
117 /**
118 * {@inheritDoc}
119 */
120 public void defineConfigArguments(final ArgumentParser parser)
121 throws ArgumentException
122 {
123 // No arguments will be allowed by default.
124 }
125
126 /**
127 * This hook is called when a Sync Pipe first starts up, when the
128 * <i>resync</i> process first starts up, or when the set-startpoint
129 * subcommand is called from the <i>realtime-sync</i> command line tool.
130 * Any initialization of this sync source should be performed here. This
131 * method should generally store the {@link SyncServerContext} in a class
132 * member so that it can be used elsewhere in the implementation.
133 * <p>
134 * A {@link TransactionContext} is provided, which allows
135 * controlled access to the target database. The context will contain a fresh
136 * fresh connection (i.e. a new transaction), and the Synchronization Server
137 * will always commit or rollback the transaction automatically, depending on
138 * whether this method returns normally or throws an exception. Implementers
139 * may optionally perform their own transaction management within this method
140 * if necessary.
141 * <p>
142 * The default implementation is empty.
143 *
144 * @param ctx
145 * a TransactionContext which provides a valid JDBC connection to the
146 * database.
147 * @param serverContext A handle to the server context for the server in
148 * which this extension is running.
149 * @param config The general configuration for this sync source.
150 * @param parser The argument parser which has been initialized from
151 * the configuration for this JDBC sync source.
152 */
153 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE)
154 public void initializeJDBCSyncSource(final TransactionContext ctx,
155 final SyncServerContext serverContext,
156 final JDBCSyncSourceConfig config,
157 final ArgumentParser parser)
158 {
159 // No initialization will be performed by default.
160 }
161
162 /**
163 * This hook is called when a Sync Pipe shuts down, when the <i>resync</i>
164 * process shuts down, or when the set-startpoint subcommand (from the
165 * <i>realtime-sync</i> command line tool) is finished. Any clean up of this
166 * sync source should be performed here.
167 * <p>
168 * A {@link TransactionContext} is provided, which allows
169 * controlled access to the target database. The context will contain a fresh
170 * fresh connection (i.e. a new transaction), and the Synchronization Server
171 * will always commit or rollback the transaction automatically, depending on
172 * whether this method returns normally or throws an exception. Implementers
173 * may optionally perform their own transaction management within this method
174 * if necessary.
175 * <p>
176 * The default implementation is empty.
177 *
178 * @param ctx
179 * a TransactionContext which provides a valid JDBC connection to the
180 * database.
181 */
182 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE)
183 public void finalizeJDBCSyncSource(final TransactionContext ctx)
184 {
185 //no implementation required
186 }
187
188 /**
189 * This method should effectively set the starting point for synchronization
190 * to the place specified by the <code>options</code> parameter. This should
191 * cause all changes previous to the specified start point to be disregarded
192 * and only changes after that point to be returned by
193 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}.
194 * <p>
195 * There are several different startpoint types (see
196 * {@link SetStartpointOptions}), and this implementation is not required to
197 * support them all. If the specified startpoint type is unsupported, this
198 * method should throw an {@link IllegalArgumentException}.
199 * <p>
200 * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type
201 * must be supported by your implementation, because this is used when a Sync
202 * Pipe first starts up.
203 * <p>
204 * This method can be called from two different contexts:
205 * <ul>
206 * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used
207 * (the Sync Pipe is required to be stopped in this context)</li>
208 * <li>Immediately after a Sync Pipe starts up and a connection is first
209 * established to the source server (e.g. before the first call to
210 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)})</li>
211 * </ul>
212 * <p>
213 * A {@link TransactionContext} is provided, which allows
214 * controlled access to the target database. The context will contain a fresh
215 * fresh connection (i.e. a new transaction), and the Synchronization Server
216 * will always commit or rollback the transaction automatically, depending on
217 * whether this method returns normally or throws an exception. Implementers
218 * may optionally perform their own transaction management within this method
219 * if necessary.
220 *
221 * @param ctx
222 * a TransactionContext which provides a valid JDBC connection to the
223 * database.
224 * @param options
225 * an object which indicates where exactly to start synchronizing
226 * (e.g.
227 * the end of the changelog, specific change number, a certain time
228 * ago, etc)
229 * @throws SQLException
230 * if there is any error while setting the start point
231 */
232 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE)
233 public abstract void setStartpoint(final TransactionContext ctx,
234 final SetStartpointOptions options)
235 throws SQLException;
236
237 /**
238 * Gets the current value of the startpoint for change detection. This is the
239 * "bookmark" which indicates which changes have already been processed and
240 * which have not. In most cases, a change number is used to detect changes
241 * and is managed by the Synchronization Server, in which case this
242 * implementation needs only to return the latest acknowledged
243 * change number. In other cases, the return value may correspond to a
244 * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server.
245 * In any case, this method should return the value that is updated by
246 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}.
247 * <p>
248 * This method is called periodically and the return value is saved in the
249 * persistent state for the Sync Pipe that uses this script as its Sync
250 * Source.
251 * <p>
252 * <b>IMPORTANT</b>: The internal value for the startpoint should only be
253 * updated after a sync operation is acknowledged back to this script (via
254 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}).
255 * Otherwise it will be possible for changes to be missed when the
256 * Synchronization Server is restarted or a connection error occurs.
257 * @return a value to store in the persistent state for the Sync Pipe. This is
258 * usually
259 * a change number, but if a changelog table is not used to detect
260 * changes,
261 * this value should represent some other token to pass into
262 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)}
263 * when the sync pipe starts up.
264 */
265 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE)
266 public abstract Serializable getStartpoint();
267
268 /**
269 * Return the next batch of change records from the database. Change records
270 * are just hints that a change happened; they do not include the actual data
271 * of the change. In an effort to never synchronize stale data, the
272 * Synchronization Server will go back and fetch the full source entry for
273 * each change record.
274 * <p>
275 * On the first invocation, this should return changes starting from the
276 * startpoint that was set by
277 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)}. This
278 * method is responsible for updating the internal state such that subsequent
279 * invocations do not return duplicate changes.
280 * <p>
281 * The resulting list should be limited by <code>maxChanges</code>. The
282 * <code>numStillPending</code> reference should be set to the estimated
283 * number of changes that haven't yet been retrieved from the changelog table
284 * when this method returns, or zero if all the current changes have been
285 * retrieved.
286 * <p>
287 * <b>IMPORTANT</b>: While this method needs to keep track of which changes
288 * have already been returned so that it does not return them again, it should
289 * <b>NOT</b> modify the official startpoint. The internal value for the
290 * startpoint should only be updated after a sync operation is acknowledged
291 * back to this script (via
292 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}).
293 * Otherwise it will be possible for changes to be missed when the
294 * Synchronization Server is restarted or a connection error occurs. The
295 * startpoint should not change as a result of this method.
296 * <p>
297 * A {@link TransactionContext} is provided, which allows
298 * controlled access to the target database. The context will contain a fresh
299 * fresh connection (i.e. a new transaction), and the Synchronization Server
300 * will always commit or rollback the transaction automatically, depending on
301 * whether this method returns normally or throws an exception. Implementers
302 * may optionally perform their own transaction management within this method
303 * if necessary.
304 * <p>
305 * This method <b>does not need to be thread-safe</b>. It will be invoked
306 * repeatedly by a single thread, based on the polling interval set in the
307 * Sync Pipe configuration.
308 * @param ctx
309 * a TransactionContext which provides a valid JDBC connection to the
310 * database.
311 * @param maxChanges
312 * the maximum number of changes to retrieve
313 * @param numStillPending
314 * this should be set to the number of unretrieved changes that
315 * are still pending after this batch has been retrieved. This will
316 * be passed in
317 * as zero, and may be left that way if the actual value cannot be
318 * determined.
319 * @return a list of {@link DatabaseChangeRecord} instances, each
320 * corresponding
321 * to a row in the changelog table (or the equivalent if some other
322 * change
323 * tracking mechanism is being used). If there are no new changes to
324 * return, this
325 * method should return an empty list.
326 * @throws SQLException
327 * if there is any error while retrieving the next batch of changes
328 */
329 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE)
330 public abstract List<DatabaseChangeRecord> getNextBatchOfChanges(
331 final TransactionContext ctx,
332 final int maxChanges,
333 final AtomicLong numStillPending)
334 throws SQLException;
335
336 /**
337 * Return a full source entry (in LDAP form) from the database, corresponding
338 * to the {@link DatabaseChangeRecord} that is passed in through the
339 * {@link SyncOperation}. This method should perform any queries necessary to
340 * gather the latest values for all the attributes to be synchronized.
341 * <p>
342 * A {@link TransactionContext} is provided, which allows
343 * controlled access to the target database. The context will contain a fresh
344 * fresh connection (i.e. a new transaction), and the Synchronization Server
345 * will always commit or rollback the transaction automatically, depending on
346 * whether this method returns normally or throws an exception. Implementers
347 * may optionally perform their own transaction management within this method
348 * if necessary.
349 * <p>
350 * This method <b>must be thread safe</b>, as it will be called repeatedly and
351 * concurrently by each of the Sync Pipe worker threads as they process
352 * entries.
353 * @param ctx
354 * a TransactionContext which provides a valid JDBC connection to the
355 * database.
356 * @param operation
357 * the SyncOperation which identifies the database "entry" to
358 * fetch. The DatabaseChangeRecord can be obtained by calling
359 * <code>operation.getDatabaseChangeRecord()</code>.
360 * This is returned by
361 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}
362 * or by
363 * {@link #listAllEntries(TransactionContext, String, BlockingQueue)}
364 * .
365 * @return a full LDAP Entry, or null if no such entry exists.
366 * @throws SQLException
367 * if there is an error fetching the entry
368 */
369 @ThreadSafety(level = ThreadSafetyLevel.METHOD_THREADSAFE)
370 public abstract Entry fetchEntry(final TransactionContext ctx,
371 final SyncOperation operation)
372 throws SQLException;
373
374 /**
375 * Provides a way for the Synchronization Server to acknowledge back to the
376 * script which sync operations it has processed. This method should update
377 * the official startpoint which was set by
378 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)} and is
379 * returned by {@link #getStartpoint()}.
380 * <p>
381 * <b>IMPORTANT</b>: The internal value for the startpoint should only be
382 * updated after a sync operation is acknowledged back to this script (via
383 * this method). Otherwise it will be possible for changes to be missed when
384 * the Synchronization Server is restarted or a connection error occurs.
385 * <p>
386 * A {@link TransactionContext} is provided in case the acknowledgment needs
387 * to make it all the way back to the database itself (for example if you were
388 * using Oracle's Change Data Capture). The context will contain a fresh
389 * fresh connection (i.e. a new transaction), and the Synchronization Server
390 * will always commit or rollback the transaction automatically, depending on
391 * whether this method returns normally or throws an exception. Implementers
392 * may optionally perform their own transaction management within this method
393 * if necessary.
394 *
395 * @param ctx
396 * a TransactionContext which provides a valid JDBC connection to the
397 * database.
398 * @param completedOps
399 * a list of {@link SyncOperation}s that have finished processing.
400 * The records are listed in the order they were
401 * first detected.
402 * @throws SQLException
403 * if there is an error acknowledging the changes back to the
404 * database
405 */
406 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE)
407 public abstract void acknowledgeCompletedOps(
408 final TransactionContext ctx,
409 final LinkedList<SyncOperation> completedOps)
410 throws SQLException;
411
412 /**
413 * Performs a cleanup of the changelog table (if desired). There is a
414 * background thread that periodically invokes this method. It should remove
415 * any rows in the changelog table that are more than
416 * <code>maxAgeMillis</code> milliseconds old.
417 * <p>
418 * <b>NOTE:</b> If the system clock on the database server is not in sync with
419 * the system clock on the Synchronization Server, this method should query
420 * the database for its current time in order to determine the cut-off point
421 * for deleting changelog records.
422 * <p>
423 * A {@link TransactionContext} is provided, which allows
424 * controlled access to the target database. The context will contain a fresh
425 * fresh connection (i.e. a new transaction), and the Synchronization Server
426 * will always commit or rollback the transaction automatically, depending on
427 * whether this method returns normally or throws an exception. Implementers
428 * may optionally perform their own transaction management within this method
429 * if necessary.
430 * <p>
431 * If a separate mechanism will be used to manage the changelog table, this
432 * method may be implemented as a no-op and always return zero. This is how
433 * the default implementation behaves.
434 * @param ctx
435 * a TransactionContext which provides a valid JDBC connection to the
436 * database.
437 * @param maxAgeMillis
438 * the period of time (in milliseconds) after which a changelog table
439 * record should be deleted
440 * @return the number of rows that were deleted from the changelog table
441 * @throws SQLException
442 * if there is an error purging records from the changelog table
443 */
444 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE)
445 public int cleanupChangelog(final TransactionContext ctx,
446 final long maxAgeMillis)
447 throws SQLException
448 {
449 //no implementation provided by default; this is an opt-in feature.
450 return 0;
451 }
452
453 /**
454 * Gets a list of all the entries in the database for a given entry type. This
455 * is used by the 'resync' command line tool. The default implementation
456 * throws a {@link UnsupportedOperationException}; subclasses should override
457 * if the resync functionality is needed.
458 * <p>
459 * The <code>entryType</code> is user-defined; it will be
460 * passed in on the command line for resync. The <code>outputQueue</code>
461 * should contain {@link DatabaseChangeRecord} objects with the
462 * <code>ChangeType</code> set to <i>resync</i>.
463 * <p>
464 * This method should not return until all the entries of the given entryType
465 * have been added to the output queue. Separate threads will concurrently
466 * drain entries from the queue and process them. The queue should not
467 * actually contain full entries, but rather DatabaseChangeRecord objects
468 * which identify the full database entries. These objects are then
469 * individually passed in to
470 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore,
471 * it is important to make sure that the DatabaseChangeRecord instances
472 * contain enough identifiable information (e.g. primary keys) for each entry
473 * so that the entry can be found again.
474 * <p>
475 * The lifecycle of resync is similar to that of real-time sync, with a few
476 * differences:
477 * <ol>
478 * <li>Stream out a list of all IDs in the database (for a given entryType)
479 * </li>
480 * <li>Fetch full source entry for an ID</li>
481 * <li>Perform any mappings and compute the equivalent destination entry</li>
482 * <li>Fetch full destination entry</li>
483 * <li>Diff the computed destination entry and actual destination entry</li>
484 * <li>Apply the minimal set of changes at the destination to bring it in sync
485 * </li>
486 * </ol>
487 * If the total set of entries is very large, it is fine to split up the work
488 * into multiple database queries within this method. The queue will not grow
489 * out of control because it blocks when it becomes full. The queue capacity
490 * is fixed at 1000.
491 * <p>
492 * A {@link TransactionContext} is provided, which allows
493 * controlled access to the target database. The context will contain a fresh
494 * fresh connection (i.e. a new transaction), and the Synchronization Server
495 * will always commit or rollback the transaction automatically, depending on
496 * whether this method returns normally or throws an exception. Implementers
497 * may optionally perform their own transaction management within this method
498 * if necessary.
499 *
500 * @param ctx
501 * a TransactionContext which provides a valid JDBC connection to the
502 * database.
503 * @param entryType
504 * the type of database entry to be fetched (this is specified
505 * on the CLI for the resync command)
506 * @param outputQueue
507 * a queue of DatabaseChangeRecord objects which will be individually
508 * fetched via
509 * {@link #fetchEntry(TransactionContext, SyncOperation)}
510 * @throws SQLException
511 * if there is an error retrieving the list of entries to resync
512 */
513 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE)
514 public void listAllEntries(final TransactionContext ctx,
515 final String entryType,
516 final BlockingQueue<DatabaseChangeRecord>
517 outputQueue) throws SQLException
518 {
519 throw new UnsupportedOperationException();
520 }
521
522 /**
523 * Note: This method is deprecated and may be removed in a future release.
524 * All new and existing code should be changed to use the version of this
525 * method which includes the <code>entryType</code> parameter.
526 * <p>
527 * Gets a list of all the entries in the database from a given file input.
528 * This is used by the 'resync' command line tool. The default implementation
529 * throws a {@link UnsupportedOperationException}; subclasses should override
530 * if the resync functionality is needed for specific database records, which
531 * can be specified in the input file.
532 * <p>
533 * The format for the <code>inputLines</code> (e.g. the content of the file)
534 * is user-defined; it may be key/value pairs, primary keys, or full SQL
535 * statements, for example. The use of this method is triggered via the
536 * <i>--sourceInputFile</i> argument on the resync CLI. The
537 * <code>outputQueue</code> should contain {@link DatabaseChangeRecord}
538 * objects with the <code>ChangeType</code> set to <i>resync</i>.
539 * <p>
540 * This method should not return until all the entries specified by the input
541 * file have been added to the output queue. Separate threads will
542 * concurrently drain entries from the queue and process them. The queue
543 * should not actually contain full entries, but rather DatabaseChangeRecord
544 * objects which identify the full database entries. These objects are then
545 * individually passed in to
546 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore,
547 * it is important to make sure that the DatabaseChangeRecord instances
548 * contain enough identifiable information (e.g. primary keys) for each entry
549 * so that the entry can be found again.
550 * <p>
551 * The lifecycle of resync is similar to that of real-time sync, with a few
552 * differences:
553 * <ol>
554 * <li>Stream out a list of all IDs in the database (using the given input
555 * file)</li>
556 * <li>Fetch full source entry for an ID</li>
557 * <li>Perform any mappings and compute the equivalent destination entry</li>
558 * <li>Fetch full destination entry</li>
559 * <li>Diff the computed destination entry and actual destination entry</li>
560 * <li>Apply the minimal set of changes at the destination to bring it in sync
561 * </li>
562 * </ol>
563 * If the total set of entries is very large, it is fine to split up the work
564 * into multiple database queries within this method. The queue will not grow
565 * out of control because it blocks when it becomes full. The queue capacity
566 * is fixed at 1000.
567 * <p>
568 * A {@link TransactionContext} is provided, which allows
569 * controlled access to the target database. The context will contain a fresh
570 * fresh connection (i.e. a new transaction), and the Synchronization Server
571 * will always commit or rollback the transaction automatically, depending on
572 * whether this method returns normally or throws an exception. Implementers
573 * may optionally perform their own transaction management within this method
574 * if necessary.
575 *
576 * @param ctx
577 * a TransactionContext which provides a valid JDBC connection to the
578 * database.
579 * @param inputLines
580 * an Iterator containing the lines from the specified input file to
581 * resync (this is specified on the CLI for the resync command).
582 * These lines can be any format, for example a set of primary keys,
583 * a set of WHERE clauses, a set of full SQL queries, etc.
584 * @param outputQueue
585 * a queue of DatabaseChangeRecord objects which will be individually
586 * fetched via {@link #fetchEntry(TransactionContext, SyncOperation)}
587 * @throws SQLException
588 * if there is an error retrieving the list of entries to resync
589 */
590 @Deprecated
591 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE)
592 public void listAllEntries(final TransactionContext ctx,
593 final Iterator<String> inputLines,
594 final BlockingQueue<DatabaseChangeRecord>
595 outputQueue) throws SQLException
596 {
597 throw new UnsupportedOperationException();
598 }
599
600 /**
601 * Gets a list of all the entries in the database from a given file input.
602 * This is used by the 'resync' command line tool. The default implementation
603 * throws a {@link UnsupportedOperationException}; subclasses should override
604 * if the resync functionality is needed for specific database records, which
605 * can be specified in the input file.
606 * <p>
607 * The format for the <code>inputLines</code> (e.g. the content of the file)
608 * is user-defined; it may be key/value pairs, primary keys, or full SQL
609 * statements, for example. The use of this method is triggered via the
610 * <i>--sourceInputFile</i> argument on the resync CLI. The
611 * <code>outputQueue</code> should contain {@link DatabaseChangeRecord}
612 * objects with the <code>ChangeType</code> set to <i>resync</i>.
613 * <p>
614 * This method should not return until all the entries specified by the input
615 * file have been added to the output queue. Separate threads will
616 * concurrently drain entries from the queue and process them. The queue
617 * should not actually contain full entries, but rather DatabaseChangeRecord
618 * objects which identify the full database entries. These objects are then
619 * individually passed in to
620 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore,
621 * it is important to make sure that the DatabaseChangeRecord instances
622 * contain enough identifiable information (e.g. primary keys) for each entry
623 * so that the entry can be found again.
624 * <p>
625 * The lifecycle of resync is similar to that of real-time sync, with a few
626 * differences:
627 * <ol>
628 * <li>Stream out a list of all IDs in the database (using the given input
629 * file)</li>
630 * <li>Fetch full source entry for an ID</li>
631 * <li>Perform any mappings and compute the equivalent destination entry</li>
632 * <li>Fetch full destination entry</li>
633 * <li>Diff the computed destination entry and actual destination entry</li>
634 * <li>Apply the minimal set of changes at the destination to bring it in sync
635 * </li>
636 * </ol>
637 * If the total set of entries is very large, it is fine to split up the work
638 * into multiple database queries within this method. The queue will not grow
639 * out of control because it blocks when it becomes full. The queue capacity
640 * is fixed at 1000.
641 * <p>
642 * A {@link TransactionContext} is provided, which allows
643 * controlled access to the target database. The context will contain a fresh
644 * fresh connection (i.e. a new transaction), and the Synchronization Server
645 * will always commit or rollback the transaction automatically, depending on
646 * whether this method returns normally or throws an exception. Implementers
647 * may optionally perform their own transaction management within this method
648 * if necessary.
649 *
650 * @param ctx
651 * a TransactionContext which provides a valid JDBC connection to the
652 * database.
653 * @param entryType
654 * the type of database entry to be fetched (this is specified
655 * on the CLI for the resync command)
656 * @param inputLines
657 * an Iterator containing the lines from the specified input file to
658 * resync (this is specified on the CLI for the resync command).
659 * These lines can be any format, for example a set of primary keys,
660 * a set of WHERE clauses, a set of full SQL queries, etc.
661 * @param outputQueue
662 * a queue of DatabaseChangeRecord objects which will be individually
663 * fetched via {@link #fetchEntry(TransactionContext, SyncOperation)}
664 * @throws SQLException
665 * if there is an error retrieving the list of entries to resync
666 */
667 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE)
668 public void listAllEntries(final TransactionContext ctx,
669 final String entryType,
670 final Iterator<String> inputLines,
671 final BlockingQueue<DatabaseChangeRecord>
672 outputQueue) throws SQLException
673 {
674 throw new UnsupportedOperationException();
675 }
676 }