001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * docs/licenses/cddl.txt 011 * or http://www.opensource.org/licenses/cddl1.php. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * docs/licenses/cddl.txt. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Portions Copyright 2019-2024 Ping Identity Corporation 026 */ 027package com.unboundid.directory.sdk.sync.api; 028 029import com.unboundid.directory.sdk.common.internal.ExampleUsageProvider; 030import com.unboundid.directory.sdk.common.internal.Reconfigurable; 031import com.unboundid.directory.sdk.common.internal.UnboundIDExtension; 032import com.unboundid.directory.sdk.sync.config.KafkaSyncDestinationPluginConfig; 033import com.unboundid.directory.sdk.sync.internal.SynchronizationServerExtension; 034import com.unboundid.directory.sdk.sync.types.KafkaMessage; 035import com.unboundid.directory.sdk.sync.types.PreStepResult; 036import com.unboundid.directory.sdk.sync.types.SyncOperation; 037import com.unboundid.directory.sdk.sync.types.SyncServerContext; 038import com.unboundid.ldap.sdk.ResultCode; 039import com.unboundid.util.Extensible; 040import com.unboundid.util.ThreadSafety; 041import com.unboundid.util.ThreadSafetyLevel; 042import com.unboundid.util.args.ArgumentException; 043import com.unboundid.util.args.ArgumentParser; 044 045import java.util.Collections; 046import java.util.List; 047import java.util.Map; 048 049/** 050 * This class defines an API that must be implemented by extensions that 051 * perform processing on synchronization operations within a Kafka Sync 052 * Destination. These extensions may be used to 053 * <UL> 054 * <LI>Filter out messages being published to Kafka.</LI> 055 * <LI>Modify the format or contents of messages before publishing.</LI> 056 * <LI>Modify the message key before publishing.</LI> 057 * </UL> 058 * <BR> 059 * <H2>Configuring Kafka Sync Destination Plugins</H2> 060 * In order to configure a Kafka sync destination plugin created using this API, 061 * use a command like: 062 * <PRE> 063 * dsconfig create-sync-destination-plugin \ 064 * --plugin-name "<I>{plugin-name}</I>" \ 065 * --type third-party-kafka \ 066 * --set "extension-class:<I>{class-name}</I>" \ 067 * --set "extension-argument:<I>{name=value}</I>" 068 * </PRE> 069 * where "<I>{plugin-name}</I>" is the name to use for the Kafka sync 070 * destination plugin instance, "<I>{class-name}</I>" is the fully- 071 * qualified name of the Java class that extends 072 * {@code com.unboundid.directory.sdk.sync.api.KafkaSyncDestinationPlugin}, 073 * and "<I>{name=value}</I>" represents name-value pairs for any arguments to 074 * provide to the Kafka sync destination plugin. If multiple arguments should 075 * be provided to the plugin, then the "<CODE>extension-argument</CODE>" 076 * option should be provided multiple times. 077 */ 078@Extensible() 079@SynchronizationServerExtension(appliesToLocalContent = false, 080 appliesToSynchronizedContent = true) 081@ThreadSafety(level = ThreadSafetyLevel.INTERFACE_THREADSAFE) 082public abstract class KafkaSyncDestinationPlugin 083 implements UnboundIDExtension, 084 Reconfigurable<KafkaSyncDestinationPluginConfig>, 085 ExampleUsageProvider 086{ 087 088 /** 089 * Creates a new instance of this Kafka sync destination plugin. 090 * All sync destination implementations must include a default 091 * constructor, but any initialization should generally be done in 092 * the {@code initializeKafkaSyncDestinationPlugin} method. 093 */ 094 public KafkaSyncDestinationPlugin() 095 { 096 // No default behavior. 097 } 098 099 100 101 /** 102 * Retrieves a human-readable name for this extension. 103 * 104 * @return A human-readable name for this extension. 105 */ 106 @Override 107 public abstract String getExtensionName(); 108 109 110 111 /** 112 * Retrieves a human-readable description for this extension. Each element 113 * of the array that is returned will be considered a separate paragraph in 114 * generated documentation. 115 * 116 * @return A human-readable description for this extension, or {@code null} 117 * or an empty array if no description should be available. 118 */ 119 @Override 120 public abstract String[] getExtensionDescription(); 121 122 123 124 /** 125 * Updates the provided argument parser to define any configuration arguments 126 * which may be used by this extension. The argument parser may also be 127 * updated to define relationships between arguments (e.g., to specify 128 * required, exclusive, or dependent argument sets). 129 * 130 * @param parser The argument parser to be updated with the configuration 131 * arguments which may be used by this extension. 132 * 133 * @throws ArgumentException If a problem is encountered while updating the 134 * provided argument parser. 135 */ 136 @Override 137 public void defineConfigArguments(final ArgumentParser parser) 138 throws ArgumentException 139 { 140 // No arguments will be allowed by default. 141 } 142 143 144 145 /** 146 * Initialized this Kafka sync destination plugin. This method will be 147 * called before any other methods in the class. 148 * 149 * @param serverContext A handle to the server context for the 150 * Data Sync Server in which this extension is 151 * running. Extensions should typically store this 152 * in a class member. 153 * @param config The general configuration for this plugin. 154 * @param parser The argument parser which has been initialized from 155 * the configuration for this Kafka sync destination 156 * plugin. 157 */ 158 public void initializeKafkaSyncDestinationPlugin( 159 final SyncServerContext serverContext, 160 final KafkaSyncDestinationPluginConfig config, 161 final ArgumentParser parser) 162 { 163 // No Initialization will be performed by default. 164 } 165 166 167 168 /** 169 * Indicates whether the configuration represented by the provided argument 170 * parser is acceptable for use by this extension. The parser will have been 171 * used to parse any configuration available for this extension, and any 172 * automatic validation will have been performed. This method may be used to 173 * perform any more complex validation which cannot be performed automatically 174 * by the argument parser. 175 * 176 * @param config The general configuration for this extension. 177 * @param parser The argument parser that has been used to 178 * parse the proposed configuration for this 179 * extension. 180 * @param unacceptableReasons A list to which messages may be added to 181 * provide additional information about why the 182 * provided configuration is not acceptable. 183 * 184 * @return {@code true} if the configuration in the provided argument parser 185 * appears to be acceptable, or {@code false} if not. 186 */ 187 @Override 188 public boolean isConfigurationAcceptable( 189 final KafkaSyncDestinationPluginConfig config, 190 final ArgumentParser parser, 191 final List<String> unacceptableReasons) 192 { 193 // No extended validation will be performed by default. 194 return true; 195 } 196 197 198 199 /** 200 * Attempts to apply the configuration from the provided argument parser to 201 * this extension. 202 * 203 * @param config The general configuration for this extension. 204 * @param parser The argument parser that has been used to 205 * parse the new configuration for this 206 * extension. 207 * @param adminActionsRequired A list to which messages may be added to 208 * provide additional information about any 209 * additional administrative actions that may 210 * be required to apply some of the 211 * configuration changes. 212 * @param messages A list to which messages may be added to 213 * provide additional information about the 214 * processing performed by this method. 215 * 216 * @return A result code providing information about the result of applying 217 * the configuration change. A result of {@code SUCCESS} should be 218 * used to indicate that all processing completed successfully. Any 219 * other result will indicate that a problem occurred during 220 * processing. 221 */ 222 @Override 223 public ResultCode applyConfiguration( 224 final KafkaSyncDestinationPluginConfig config, 225 final ArgumentParser parser, 226 final List<String> adminActionsRequired, 227 final List<String> messages) 228 { 229 // By default, no configuration changes will be applied. 230 return ResultCode.SUCCESS; 231 } 232 233 234 235 /** 236 * Performs any cleanup that may be necessary when this Kafka sync destination 237 * plugin is taken out of service. This can happen when it is deleted from 238 * the configuration and at server shutdown. 239 */ 240 public void finalizeKafkaSyncDestinationPlugin() 241 { 242 // No implementation is required. 243 } 244 245 /** 246 * Provides the ability to alter or suppress Kafka messages. 247 * This method is called before a message is published to the 248 * configured Kafka Topic. 249 * 250 * @param message The message to be published to the 251 * Kafka Topic. 252 * @param operation The synchronization operation for this 253 * change. 254 * 255 * @return The result of the plugin processing. 256 */ 257 public PreStepResult prePublish(final KafkaMessage message, 258 final SyncOperation operation) 259 { 260 return PreStepResult.CONTINUE; 261 } 262 263 /** 264 * Retrieves a string representation of this Kafka sync destination plugin. 265 * 266 * @return A string representation of this Kafka sync destination plugin. 267 */ 268 @Override() 269 public final String toString() 270 { 271 final StringBuilder buffer = new StringBuilder(); 272 toString(buffer); 273 return buffer.toString(); 274 } 275 276 /** 277 * Appends a string representation of this Kafka sync destination plugin to 278 * the provided buffer. 279 * 280 * @param buffer The buffer to which the string representation should be 281 * appended. 282 */ 283 public abstract void toString(final StringBuilder buffer); 284 285 /** 286 * Retrieves a map containing examples of configurations that may be used for 287 * this extension. The map key should be a list of sample arguments, and the 288 * corresponding value should be a description of the behavior that will be 289 * exhibited by the extension when used with that configuration. 290 * 291 * @return A map containing examples of configurations that may be used for 292 * this extension. It may be {@code null} or empty if there should 293 * not be any example argument sets. 294 */ 295 @Override 296 public Map<List<String>,String> getExamplesArgumentSets() 297 { 298 return Collections.emptyMap(); 299 } 300}