package com.unboundid.directory.sdk.examples;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.unboundid.directory.sdk.common.types.LogSeverity;
import com.unboundid.directory.sdk.sync.api.KafkaSyncDestinationPlugin;
import com.unboundid.directory.sdk.sync.config.KafkaSyncDestinationPluginConfig;
import com.unboundid.directory.sdk.sync.types.KafkaMessage;
import com.unboundid.directory.sdk.sync.types.PreStepResult;
import com.unboundid.directory.sdk.sync.types.SyncOperation;
import com.unboundid.directory.sdk.sync.types.SyncServerContext;
import com.unboundid.ldap.sdk.ResultCode;
import com.unboundid.util.args.ArgumentException;
import com.unboundid.util.args.ArgumentParser;
import com.unboundid.util.args.StringArgument;

import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

 * This class provides a simple example of a Kafka sync destination plugin
 * which filters out test data and removes passwords from Kafka messages.
 * Currently, a single API is provided for manipulating messages prior to
 * their being published to a Kafka topic.
 * <UL>
 *   <LI>
 *     {@link #prePublish(KafkaMessage, SyncOperation)} } -- Allows for the
 *     transformation or filtering of messages prior to publishing.
 *   </LI>
 * </UL>
 * It takes the following argument:
 * <UL>
 *   <LI>filter-attribute -- One or more attribute names which will
 *                           be removed from any messages prior to
 *                           publishing.</LI>
 * </UL>
 * Note: This extension is provided as an example of the capabilities
 * of a Kafka sync destination plugin. In a production environment, attribute
 * modification and filtering would be handled by attribute mapping and
 * do not require the use of any plugins.
public class ExampleKafkaSyncDestinationPlugin
        extends KafkaSyncDestinationPlugin {

  private static final ObjectMapper MAPPER = new ObjectMapper();

  private static final String ARG_NAME_FILTER_ATTRIBUTES =

  private SyncServerContext serverContext;

  private volatile List<String> filterAttributes;

   * Retrieves a human-readable name for this extension.
   * @return  A human-readable name for this extension.
  public String getExtensionName() {
    return "Example Kafka Sync Destination Plugin";

   * Retrieves a human-readable description for this extension.  Each element
   * of the array that is returned will be considered a separate paragraph in
   * generated documentation.
   * @return  A human-readable description for this extension, or {@code null}
   *          or an empty array if no description should be available.
  public String[] getExtensionDescription() {
    return new String[]{
            "This is a simple example of a Kafka sync destination plugin " +
                    "which will filter out any messages intended to be " +
                    "test data. Additionally, it can be configured to filter " +
                    "out individual attributes on messages. For example, " +
                    "passwords or other sensitive attributes."

   * Updates the provided argument parser to define any configuration arguments
   * which may be used by this sync pipe plugin.  The argument parser may
   * also be updated to define relationships between arguments (e.g., to specify
   * required, exclusive, or dependent argument sets).
   * @param  parser  The argument parser to be updated with the configuration
   *                 arguments which may be used by this sync pipe plugin.
   * @throws ArgumentException  If a problem is encountered while updating the
   *                            provided argument parser.
  public void defineConfigArguments(final ArgumentParser parser)
          throws ArgumentException {
    // Add an argument that allows for the specification of a list
    // of attribute names to filter out of Kafka Messages.
    Character shortIdentifier = null;
    String longIdentifier = ARG_NAME_FILTER_ATTRIBUTES;
    boolean required = false;
    int maxOccurences = -1; // multi-valued
    String placeHolder = "{attr-name}";
    String description = "The name of the attribute (post mapping) whose" +
            " value should be filtered out of any messages.";

    StringArgument arg = new StringArgument(
            shortIdentifier, longIdentifier,
            required, maxOccurences,
            placeHolder, description

   * Initialize this Kafka sync destination plugin. This method will be called
   * before any other methods in this class.
   * @param  serverContext  A handle to the server context for the
   *                        Data Sync Server in which this extension is
   *                        running. Extensions should typically store this
   *                        in a class member.
   * @param  config         The general configuration for this plugin.
   * @param  parser         The argument parser which has been initialized from
   *                        the configuration for this Kafka sync destination
   *                        plugin.
  public void initializeKafkaSyncDestinationPlugin(
          final SyncServerContext serverContext,
          final KafkaSyncDestinationPluginConfig config,
          final ArgumentParser parser) {
    this.serverContext = serverContext;
    this.filterAttributes =

   * Attempts to apply the configuration contained in the provided argument
   * parser.
   * @param  config                The general configuration for this Kafka sync
   *                               destination plugin.
   * @param  parser                The argument parser which has been
   *                               initialized with the new configuration.
   * @param  adminActionsRequired  A list that can be updated with information
   *                               about any administrative actions that may be
   *                               required before one or more of the
   *                               configuration changes will be applied.
   * @param  messages              A list that can be updated with information
   *                               about the result of applying the new
   *                               configuration.
   * @return  A result code that provides information about the result of
   *          attempting to apply the configuration change.
  public ResultCode applyConfiguration(
          final KafkaSyncDestinationPluginConfig config,
          final ArgumentParser parser,
          final List<String> adminActionsRequired,
          final List<String> messages
  ) {
    this.filterAttributes =
    return ResultCode.SUCCESS;

   * Retrieves a map containing examples of configurations that may be used for
   * this extension.  The map key should be a list of sample arguments, and the
   * corresponding value should be a description of the behavior that will be
   * exhibited by the extension when used with that configuration.
   * @return  A map containing examples of configurations that may be used for
   *          this extension.  It may be {@code null} or empty if there should
   *          not be any example argument sets.
  public Map<List<String>, String> getExamplesArgumentSets() {
    final LinkedHashMap<List<String>, String> exampleMap =
            new LinkedHashMap<>(1);

                    ARG_NAME_FILTER_ATTRIBUTES + "=userPassword",
                    ARG_NAME_FILTER_ATTRIBUTES + "=authPassword"
            "Filter password attributes."

    return exampleMap;

   * Provides the ability to alter or suppress Kafka messages.
   * This method is called before a message is published to the
   * configured Kafka Topic.
   * @param message   The message to be published to the
   *                  Kafka topic.
   * @param operation The synchronization operation for this
   *                  change.
   * @return The result of the plugin processing.
  public PreStepResult prePublish(final KafkaMessage message,
                                  final SyncOperation operation) {

    // Skip any operations which contain test data.
    if (message.getKey().contains("test-data")) {
      operation.logInfo("Not sending Kafka message for test data.");
      return PreStepResult.ABORT_OPERATION;

    // Retrieve the current message.
    String currentMessageValueString = message.getValue();

    try {
      // Parse it into JSON.
      JsonNode currentMessageValueJSON =

      // Strip out sensitive fields
      // Note: The removal of sensitive attributes can also be
      // handled by attribute mapping.
      for (String attrName : filterAttributes) {
        for (String entryField : Arrays.asList("current", "previous")) {
          if (currentMessageValueJSON.has(entryField)) {
            if (currentMessageValueJSON.get(entryField).has(attrName)) {
              operation.logDebug("Removing sensitive attribute " + attrName +
                      " from the operation.");
              ((ObjectNode) currentMessageValueJSON.get(entryField))

      // Save the filtered JSON back into the message.

      // Return thumbs up.
      return PreStepResult.CONTINUE;
    } catch (IOException e) {
              "Unable to parse Kafka Message as JSON! Exception: " +

      // Try again a few times in case of a transient issue.
      return PreStepResult.RETRY_OPERATION_LIMITED;

   * Appends a string representation of this Kafka sync destination plugin to
   * the provided buffer.
   * @param buffer The buffer to which the string representation should be
   *               appended.
  public void toString(final StringBuilder buffer) {
    if (filterAttributes.size() > 0) {
      for (int i = 0; i < filterAttributes.size(); i++) {
        buffer.append(ARG_NAME_FILTER_ATTRIBUTES + "='")
        if (i != filterAttributes.size() - 1) {
          buffer.append(", ");