DataSpell 2023.2 Help

Kafka

The Kafka plugin lets you monitor your Kafka event streaming processes, create consumers, producers, and topics. It also lets you connect to Schema Registry, create and update schemas.

Install the Kafka plugin

This functionality relies on the Kafka plugin, which you need to install and enable.

  1. Press Control+Alt+S to open the IDE settings and then select Plugins.

  2. Open the Marketplace tab, find the Kafka plugin, and click Install (restart the IDE if prompted).

With the Kafka plugin, you can:

  1. Connect to:

  2. Produce and consume data

  3. Manage topics

  4. Work with Schema Registry

If the Kafka plugin is installed and enabled, you can use the Kafka tool window (Window | Tool Windows | Kafka) to connect to Kafka and work with it. Alternatively, if the Remote File Systems or Zeppelin plugin is installed and enabled, you can also access Kafka connections using the Big Data Tools tool window (Window | Tool Windows | Big Data Tools).

Connect to Kafka

Connect to Kafka using cloud providers

Connect to Confluent cluster

  1. Open the Kafka tool window: Window | Tool Windows | Kafka.

  2. In the Name field, enter the name of the connection to distinguish it between other connections.

  3. In the Configuration source list, select Cloud, and then, in the Provider list, select Confluent.

  4. Go to https://confluent.cloud/home. On the right side of the page, click the settings menu, select Environments, and select your cluster, and then select Clients | Java.

    In the Copy the configuration snippet for your clients block, provide Kafka API keys and click Copy.

  5. Go back to your IDE and paste the copied properties into the Configuration field.

  6. Once you fill in the settings, click Test connection to ensure that all configuration parameters are correct. Then click OK.

Kafka Confluent

Optionally, you can set up:

  • Enable connection: deselect if you want to disable this connection. By default, the newly created connections are enabled.

Connect to AWS MSK cluster

  1. Open the Kafka tool window: Window | Tool Windows | Kafka.

  2. In the Name field, enter the name of the connection to distinguish it between other connections.

  3. In the Configuration source list, select Cloud, and then, in the Provider list, select AWS MSK.

  4. In the Bootstrap servers field, enter the URL of the Kafka broker or a comma-separated list of URLs.

  5. In the AWS Authentication list, select the authentication method.

    • Default credential providers chain: use the credentials from the default provider chain. For more information about the chain, refer to Using the Default Credential Provider Chain.

    • Profile from credentials file: select a profile from your credentials file.

    • Explicit access key and secret key: enter your credentials manually.

  6. Optionally, you can connect to Schema Registry.

  7. If you want to use an SSH tunnel while connecting to Kafka, select Enable tunneling and in the SSH configuration list, select an SSH configuration or create a new one.

  8. Once you fill in the settings, click Test connection to ensure that all configuration parameters are correct. Then click OK.

Kafka AWS MSK

Optionally, you can set up:

  • Enable connection: deselect if you want to disable this connection. By default, the newly created connections are enabled.

Connect to custom Kafka server

  1. Open the Kafka tool window: Window | Tool Windows | Kafka.

  2. In the Name field, enter the name of the connection to distinguish it between other connections.

  3. In the Configuration source list, select Custom.

  4. In the Bootstrap servers field, enter the URL of the Kafka broker or a comma-separated list of URLs.

  5. Under Authentication, select an authentication method:

    • None: connect without authentication.

    • SASL: select an SASL mechanism (Plain, SCRAM-SHA-256, SCRAM-SHA-512, or Kerberos) and provide your username and password.

    • SSL

      • Select Validate server host name if you want to verify that the broker host name matches the host name in the broker certificate. Clearing the checkbox is equivalent to adding the ssl.endpoint.identification.algorithm= property.

      • In the Truststore location, provide a path to the SSL truststore location (ssl.truststore.location property).

      • In the Truststore password, provide a path to the SSL truststore password (ssl.truststore.password property).

      • Select Use Keystore client authentication and provide values for Keystore location (ssl.keystore.location), Keystore password (ssl.keystore.password), and Key password (ssl.key.password).

    • AWS IAM: use AWS IAM for Amazon MSK. In the AWS Authentication list, select one of the following:

      • Default credential providers chain: use the credentials from the default provider chain. For more information about the chain, refer to Using the Default Credential Provider Chain.

      • Profile from credentials file: select a profile from your credentials file.

      • Explicit access key and secret key: enter your credentials manually.

  6. Optionally, you can connect to Schema Registry.

  7. If you want to use an SSH tunnel while connecting to Kafka, select Enable tunneling and in the SSH configuration list, select an SSH configuration or create a new one.

  8. Once you fill in the settings, click Test connection to ensure that all configuration parameters are correct. Then click OK.

Kafka custom connection

Optionally, you can set up:

  • Enable connection: deselect if you want to disable this connection. By default, the newly created connections are enabled.

Connect to Kafka using properties

  1. Open the Kafka tool window: Window | Tool Windows | Kafka.

  2. In the Name field, enter the name of the connection to distinguish it between other connections.

  3. In the Configuration source list, select Properties.

  4. In the Bootstrap servers field, enter the URL of the Kafka broker or a comma-separated list of URLs.

  5. Select the way to provide Kafka Broker configuration properties:

    • Implicit: paste provided configuration properties. Or you can enter them manually using code completion and quick documentation that DataSpell provides.

    • From File: select the properties file.

  6. Optionally, you can connect to Schema Registry.

  7. If you want to use an SSH tunnel while connecting to Kafka, select Enable tunneling and in the SSH configuration list, select an SSH configuration or create a new one.

  8. Once you fill in the settings, click Test connection to ensure that all configuration parameters are correct. Then click OK.

Optionally, you can set up:

  • Enable connection: deselect if you want to disable this connection. By default, the newly created connections are enabled.

Once you have established a connection to the Kafka server, a new tab with this connection appears in the Kafka tool window. You can use it to produce and consume data, create and delete topics. If you are connected to a Schema Registry, you can also view, create, and update schemas.

Click Connection Settings in any tab of the Kafka tool window to rename, delete, disable, or refresh the connection, or to modify its settings.

Kafka connection: topics

All the cluster topics are displayed in the Topics section. You can click Show Favorites to show only favorite topics or Show internal to show or hide internal topics. Click any topic to get more details on it, such as info on partitions, configuration, and schema.

Create a topic

  1. Open the Kafka tool window: Window | Tool Windows | Kafka.

  2. Select Topics and click Add a topic (or press Alt+Insert).

  3. Name the new topic, specify the number of partitions and replication factor, and click OK.

Delete records from a topic

  1. Open the Kafka tool window: Window | Tool Windows | Kafka.

  2. Under Topics, right-click a topic and select Clear Topic (or click Clear Topic to the left of it). Click OK to confirm deletion.

Produce and consume data

Produce data

  1. Open the Kafka tool window: Window | Tool Windows | Kafka.

  2. Select a Kafka connection and click Create Producer (Create Producer).

    This will open a producer in a new editor tab.

  3. In the Topic list, select a topic to write messages to.

  4. Under Key and Value, select the message key and value. Use Generate Random Data to generate a random value based on the selected type.

    If you are connected to a Schema Registry, you can select Schema Registry to check the sent data against a selected schema.

  5. Under Headers, provide any custom headers. If you have them in JSON or CSV format, you can paste them into this section.

  6. Under Flow, you can control the record flow:

    • In Records at a time, enter a number if you want to send multiple records simultaneously.

    • Select Generate random keys and Generate random values if you want the record data to be randomly generated.

    • Set the Interval in milliseconds between sending records.

    • Provide Stop Conditions if you want the producer to stop sending messages when either a specified number of records is reached or a specified amount of time has elapsed.

  7. Under Options, provide additional options:

    • Partition: specify a topic partition, to which the record must be sent. If not specified, the default logic is used: The producer takes the hash of the key modulo the number of partitions.

    • Compression: select the compression type for data generated by the producer: None, Gzip, Snappy, Lz4, or Zstd.

    • Idempotence: select if you want to ensure that exactly one copy of each message is written in the stream.

    • Acks: select Leader if you want the leader to write the record to its local log and respond without awaiting full acknowledgement from all followers. Select All for the leader to wait for the full set of in-sync replicas to acknowledge the record. Keep None for the producer in order not to wait for any acknowledgment from the server.

  8. Click Produce.

Produce messages in Kafka

You can then click any record in the Data tab to show its details. You can also click Show Statistics to enable statistics.

Consume data

  1. Open the Kafka tool window: Window | Tool Windows | Kafka.

  2. Select a Kafka connection and click Create Consumer (Create Consumer).

    This will open a consumer in a new editor tab.

  3. In the Topic list, select a topic to which you want to subscribe.

  4. Under Key and Value, select the data types for the keys and values of records that you are going to consume.

  5. Use Range and Filters to narrow down the data for consumption:

    • In the Start from list, select a period or offset from which you want to consume data. Select From the beginning to get all records from the topic.

    • In the Limit list, select when to stop receiving data, for example, when a certain number of records is reached in the topic.

    • Use Filter to filter records by substring in their keys, values, or headers.

  6. Under Partitions enter a partition ID or a comma-separated list of IDs to get records from specific partitions only.

  7. Click Start Consuming.

Consume messages in Kafka

You can then click any record in the Data tab to show its details. You can also click Show Statistics to enable statistics.

Save a producer or consumer preset

If you often produce or consume data with the same keys, values, headers, or other parameters, you can save them as a preset. You can then reuse presets to quickly create a producer or a consumer.

  1. In the Kafka tool window, click Create Producer (Create Producer) or Create Consumer (Create Consumer).

  2. Specify the needed parameters and, on top of the producer or consumer creation form, click the Favorite icon (Save Preset).

The parameters are saved as a preset, which is available in the Presets tab. Click a preset to apply it.

Work with Schema Registry

Producers and consumers can use schemas to validate and ensure consistency of their record keys and values. The Kafka plugin integrates with Schema Registry and supports Avro, Protobuf, and JSON schemas. It enables you to:

  • Connect to a Schema Registry

  • Create, update, delete, and clone schemas

  • Preview schemas in raw format or tree view

  • Compare schema versions

  • Delete schema versions

Connect to Schema Registry

  1. Create connection to a Kafka Broker using cloud providers, custom server, or properties.

  2. If you use Confluent, you can paste both Broker and Schema Registry properties into the Configuration field.

    Otherwise, expand the Schema Registry section and select a provider: Confluent or Glue.

    • URL: enter the Schema Registry URL.

    • Configuration source: select the way to provide connection parameters:

      • Custom: select the authentication method and provide credentials.

        If you want to use SSL settings different from those of the Kafka Broker, clear the Use broker SSL settings checkbox and provide the path for the truststore.

      • Properties: paste provided configuration properties. Or you can enter properties manually using code completion and quick documentation that DataSpell provides.

    • Region: select the Schema Registry region.

    • AWS Authentication: select the authentication method:

      • Default credential providers chain: use the credentials from the default provider chain. For more information about the chain, refer to Using the Default Credential Provider Chain.

      • Profile from credentials file: select a profile from your credentials file.

      • Explicit access key and secret key: enter your credentials manually.

    • Registry name: enter the name of a Schema Registry to which you want to connect or click Show Schema Registry to select it from the list.

  3. Once you fill in the settings, click Test connection to ensure that all configuration parameters are correct. Then click OK.

Create schema

  1. Open the Kafka tool window: Window | Tool Windows | Kafka.

  2. Select Schema Registry and click Create Schema (or press Alt+Insert).

  3. In the Format list, select the schema format: Avro, Protobuf, or JSON.

  4. In the Strategy list, select the naming strategy and, depending on the selected strategy, set up the name suffix or select a topic. Alternatively, select Custom name and enter any name.

Create Schema

You can preview schemas in a tree and raw view.

Tree View
Raw View

Compare schema versions

  1. When connected to a Schema Registry, select a schema under Schema Registry.

  2. Switch to Raw View and click Compare. The button is available if a schema has more than one version.

Compare versions

Delete a schema version

If a schema has more than one version, you can delete a particular version. Schema Registry supports two types of deletion: soft (when the schema metadata and ID are not removed from the registry after the version deletion) and hard (which removes all metadata, including schema IDs). The ability to choose depends on whether you use Confluent or AWS Glue Schema Registry:

  • In Confluent Schema Registry, soft delete is used by default. You can choose to use a hard delete by selecting the Permanent deletion checkbox.

  • AWS Glue Schema Registry always uses a hard delete.

  1. Under Schema Registry, select a schema.

  2. To the right of it, click More and select Delete Version.

Last modified: 23 August 2023