Skip To Content

Subscribe to a Kafka Topic for JSON

The Subscribe to a Kafka Topic for JSON input connector can be used to retrieve and adapt event data records, as generic JSON, from a Kafka Topic. For more information about getting started with Apache Kafka, see Apache Kafka Introduction.

Usage notes

Keep the following in mind when working with the Subscribe to a Kafka Topic for JSON input connector:

  • Use this input connector to consume JSON data from a Kafka Topic. This input connector is a consumer of Kafka.
  • This input connector pairs the Generic-JSON inbound adapter with the Kafka inbound transport.
  • The adapter interprets generic JSON as opposed to feature JSON or GeoJSON.
  • A generic JSON record does not have to contain data that represents a geometry.
  • The adapter will handle both single JSON records and JSON records organized in an array.
  • The adapter supports the ability to construct a point geometry from x, y, and z attribute values.
  • This input connector includes a Learning Mode parameter which can be used to allow the input connector to modify a GeoEvent Definition it has constructed. The purpose of this parameter is to temporarily accept that event data received will have a variable schema or data structure. The input connector will use a sample of received data records to learn more about the variable data structure and append new, previously unobserved, attribute fields to an existing GeoEvent Definition.
  • Allowing a GeoEvent Definition to be changed on the fly can negatively impact the design of real-time analysis in a GeoEvent Service. As a best practice, it is recommended that if schema variance is expected in your inbound event data that you use the learning mode for as brief a period of time as possible to produce a GeoEvent Definition that supports all expected variants of your inbound data. The learning mode can then be turned off and the autogenerated GeoEvent Definition will be copied and tailored for production deployment.
  • The Kafka inbound transport supports TLS 1.2 and SASL security protocols for authenticating with a Kafka cluster or broker.

Parameters

The following are the parameters for the Subscribe to a Kafka Topic for JSON input connector:

ParameterDescription

Name

A descriptive name for the input connector used for reference in GeoEvent Manager.

Override with Custom Kafka Properties

Specify whether to override the default GeoEvent Server Kafkaclient properties. The default is No.

  • Yes—The default Kafka client properties exposed by the transport will be overridden. A folder registered with GeoEvent Server must be specified that contains a Kafka.properties file with the correct formatting for a valid Kafkaconfiguration. See Apache Kafka Configuration for a list of supported configurations and expected formatting for the specified .properties file.
  • No—The default Kafka client properties exposed by the transport will not be overridden. Kafka Bootstrap Servers and Consumer Group ID must be specified.

Kafka Bootstrap Servers

(Conditional)

A list of hostname:port pairs used to establish the initial connection to the Kafka cluster. Hostname:port pairs must be comma separated, for example, broker0.example.com:9092,broker1.example.com:9092,broker2.example.com:9092.

The parameter is shown when Override with Custom Kafka Properties is set to No.

Topic Name(s)

The name of a Kafka topic, or list of Kafka topics, to consume data of interest from. Multiple topics must be separated by a semicolon.

  • topic1
  • topic1;topic2;topic3;topic4

Remarque :

The ability to specify multiple Kafka topics is supported in ArcGIS GeoEvent Server 10.8 and later.

Number of Consumers

Specifies the number of consumers for each consumer group. The default number of consumers is 1.

Remarque :

The number of consumers is limited by the number of partitions on the Kafka topic. See Apache Kafka Introduction for more information on consumer instances.

Consumer Group ID

(Conditional)

An optional string that uniquely identifies the consumer group for a set of consumers. This is also known as the consumer group name.

If a Consumer Group ID is unspecified, GeoEvent Server will assign a static consumer group ID called geoevent-consumer. This static consumer group ID is shared across all instances of the Kafka connector where the Consumer Group ID is left unspecified.

It is highly recommended that you specify a custom Consumer Group ID. Refer to Apache Kafka Introduction for more information about consumer groups.

The parameter is shown when Override with Custom Kafka Properties is set to No.

Registered Folder for the Kafka Properties File

(Conditional)

The folder registered with GeoEvent Server that contains the Kafka .properties file. The Kafka .properties file defines the custom Kafka properties when Override with Custom Kafka Properties is set to Yes. Ensure that the folder registered with GeoEvent Server is the full path to where the Kafka .properties file is located.

The parameter is shown when Override with Custom Kafka Properties is set to Yes.

Kafka Properties File Name

(Conditional)

The name of the Kafka .properties file that contains the custom Kafka properties for client configuration. The name of the file should be specified without the .properties extension.

  • If the name of the custom Kafka properties file is sample.properties, specify this parameter as sample.

The parameter is shown when Override with Custom Kafka Properties is set to Yes.

Start from Beginning

Specifies whether records are consumed from the topic starting at the beginning offset or from the last offset committed for the consumer. The default is Yes.

  • Yes—Records will be consumed from the topic at the beginning offset.
  • No—Records will be consumed from the last offset committed.

Remarque :

For more information about offsets, see Apache Kafka Configuration.

JSON Object Name

The name of a JSON element that can be used as the root node of a substructure within the received JSON data. When JSON Object Name is used to specify a JSON element by name, the adapter will search for substructures whose object name matches the specified element name. Only data within the identified substructure will be considered. When left blank, which is the default, the uppermost JSON object is used as the root of the entire JSON structure.

Create GeoEvent Definition

Specifies whether a new or existing GeoEvent Definition should be used for the inbound event data. A GeoEvent Definition is required for GeoEvent Server to understand the inbound event data attribute fields and data types.

  • Yes—A new GeoEvent Definition will be created based on the schema of the first event record received.
  • No—A new GeoEvent Definition will not be created. Select an existing GeoEvent Definition that matches the schema of the inbound event data.

GeoEvent Definition Name (New)

(Conditional)

The name assigned to a new GeoEvent Definition. If a GeoEvent Definition with the specified name already exists, the existing GeoEvent Definition will be used. The first data record received will be used to determine the expected schema of subsequent data records, a new GeoEvent Definition will be created based on that first data record's schema.

The parameter is shown when Create GeoEvent Definition is set to Yes and is hidden when set to No.

GeoEvent Definition Name (Existing)

(Conditional)

The name of an existing GeoEvent Definition to use when adapting received data to create event data for processing by a GeoEvent Service.

The parameter is shown when Create GeoEvent Definition is set to No and is hidden when set to Yes.

Construct Geometry from Fields

Specifies whether the input connector should construct a point geometry using coordinate values received as attributes. The default is No.

  • Yes—Values from specified event attribute fields will be used to construct a point geometry.
  • No—A point geometry will not be constructed. It is assumed an attribute field contains a value that can be interpreted as a geometry or the event record is nonspatial (does not have a geometry).

X Geometry Field

(Conditional)

The attribute field in the inbound event data containing the x coordinate part (for example horizontal or longitude) of a point location.

The parameter is shown when Construct Geometry from Fields is set to Yes and is hidden when set to No.

Y Geometry Field

(Conditional)

The attribute field in the inbound event data containing the y coordinate part (for example vertical or latitude) of a point location.

The parameter is shown when Construct Geometry from Fields is set to Yes and is hidden when set to No.

Z Geometry Field

(Conditional)

The name of the field in the inbound event data containing the z coordinate part (for example depth or altitude) of a point location. If left blank, the z value will be omitted and a 2D point geometry will be constructed.

The parameter is shown when Construct Geometry from Fields is set to Yes and is hidden when set to No.

Learning Mode

Specifies whether Learning Mode is active or disabled. When Learning Mode is set to Yes, the inbound adapter will append new fields to a GeoEvent Definition it has created and is maintaining.

  • Yes—Learning mode will be enabled. The GeoEvent Definition will be updated with new fields from event records sharing different schemas.
  • No—Learning mode will not be enabled. The GeoEvent Definition will not be modified.

Learning Mode can be useful when you need to allow the input connector to modify a GeoEvent Definition it has constructed. The purpose of this parameter is to temporarily accept that event data received will have a variable schema or data structure. The input connector will use a sample of received data records to learn more about the variable data structure and append new, previously unobserved, attribute fields to an existing GeoEvent Definition.

Allowing a GeoEvent Definition to be changed on-the-fly can adversely impact the design of real-time analytics in a GeoEvent Service. If schema variance is expected in your inbound event data, it is recommended that you use Learning Mode for as brief a period of time as possible to produce a GeoEvent Definition that supports all expected variants of your inbound data. Learning mode can then be turned off and the automatically generated GeoEvent Definition copied and tailored for production deployment.

Default Spatial Reference

The well-known ID (WKID) of a spatial reference to be used when a geometry is constructed from attribute field values whose coordinates are not latitude and longitude values for an assumed WGS84 geographic coordinate system, or geometry strings are received that do not include a spatial reference. A well-known text (WKT) value or the name of an attribute field containing the WKID or WKT may also be specified.

Expected Date Format

The pattern used to match expected string representations of date/time values and convert them to Java Date values. The pattern's format follows the Java SimpleDateFormat class convention.

While GeoEvent Server prefers date/time values to be expressed in the ISO 8601 standard, several string representations of date/time values commonly recognized as date values can be converted to Java Date values without specifying an Expected Date Format pattern. These include the following:

  • "2019-12-31T23:59:59"—The ISO 8601 standard format
  • 1577836799000—Java Date (epoch long integer; UTC)
  • "Tue Dec 31 23:59:59 -0000 2019"—A common web services string format
  • "12/31/2019 11:59:59 PM"—Common format used in the United States (12-hour clock)
  • "12/31/2019 23:59:59"—Common format used in the United States (24-hour clock)

If the date/time values received are expressed using a convention other than one of the five shown above, you will have to specify an expected date format pattern so GeoEvent Server knows how the date/time values should be adapted.

As GeoJSON

Specifies whether to parse the incoming geometry as a GeoJSON geometry object rather than as feature JSON. The default assumes that when a geometry is received as a string, the value will be feature JSON as described in Geometry objects.

  • Yes—The incoming geometry will be parsed as GeoJSON rather than feature JSON.
  • No—Received string representations of a geometry are assumed to be feature JSON.

Authentication Required

Specifies whether the connection to the Kafka cluster, or Kafka broker, requires authentication. The default is No.

  • Yes—Authentication is required to the Kafka cluster or broker.
  • No—Authentication is not required to connect to the Kafka cluster or broker.

Authenticate Using

(Conditional)

Specifies the security protocol that is used to secure the Kafka cluster. Available security protocols include TLS 1.2 and SASL.

  • TLS 1.2—The security protocol used by the Kafka cluster is TLS 1.2. Ensure that the Kafka cluster's PKI file (x509 certificate) is imported into the trust store of the ArcGIS Server with which ArcGIS GeoEvent Server is configured. For details, see the Import the certificate into ArcGIS Server section in Configure ArcGIS Server with an existing CA-signed certificate for specific instructions on importing certificates.
  • SASL—The security protocol used by the Kafka cluster is SASL. Only SASL, SSL, and Kerberos are supported.

Remarque :

When using Kerberos, ensure the operating system user account running ArcGIS GeoEvent Server has read access to the keytab file in the Kerberos setup and configuration.

The parameter is shown when Authentication Required is set to Yes.

Registered Folder for Credential File

(Conditional)

The folder registered with GeoEvent Server that contains the Kafka cluster's PKI file (x509 certificate). Ensure that the folder registered with GeoEvent Server is the full path to where the Kafka cluster's certificate is located.

The parameter is shown when Authentication Required is set to Yes. It is applicable to TLS 1.2 only.

Credential Configuration File

(Conditional)

The name of the Kafka cluster's PKI file (x509 certificate). The certificate and its associated private key must be stored in the PKCS#12 format, which is represented by a file with either the .p12 or .pfx extension. Provide the name of the file in addition to the extension.

  • my_kafka_certificate.pfx
  • my_other_kafka_certificate.p12

Remarque :

Only the certificate file name and extension are supported for this parameter. Relative paths to the certificate should not be specified in this parameter. Register the full path to the certificate file using the Registered Folder for Credential File parameter.

The parameter is shown when Authentication Required is set to Yes. It is applicable to TLS 1.2 only.

Keystore Password

(Conditional)

The password for the Kafka cluster's PKI file (x509 certificate). This is also known as the certificate's private key.

The parameter is shown when Authentication Required is set to Yes. The parameter is applicable to TLS 1.2 only.

SASL Authentication Type

(Conditional)

Specifies the type of SASL authentication mechanism supported by the Kafka cluster. Available SASL authentication types include SASL GSSAPI (Kerberos) and SASL PLAIN.

  • SASL/GSSAPI (Kerberos)—The Kafka cluster uses SASL/GSSAPI Kerberos authentication.
  • SASL/PLAIN—The Kafka cluster uses SASL/PLAIN authentication.

The parameter is shown when Authentication Required is set to Yes. It is applicable to SASL only.

Kerberos Principal

(Conditional)

The Kerberos principal for the specific user, for example, GeoEventKafkaClient1@example.com.

The parameter is shown when Authentication Required is set to Yes. The parameter is applicable to SASL/GSSAPI (Kerberos) only.

Use Key Tab

(Conditional)

Indicates whether to use the keytab in the Kerberos settings. The default is Yes.

  • Yes—The keytab will be used in the Kerberos settings.
  • No—The keytab will not be used in the Kerberos settings.

The parameter is shown when Authentication Required is set to Yes. It is applicable to SASL/GSSAPI (Kerberos) only.

Store Key

(Conditional)

Indicates whether to store the key in the Kerberos settings. The default is Yes.

  • Yes—The key will be stored in the Kerberos settings.
  • No—The key will not be stored in the Kerberos settings.

The parameter is shown when Authentication Required is set to Yes. It is applicable to SASL/GSSAPI (Kerberos) only.

Username

(Conditional)

Specifies the username used to authenticate with the Kafka cluster. This is also known as a connection string with certain cloud providers. Refer to the documentation of the chosen cloud provider for correct syntax.

The parameter is shown when Authentication Required is set to Yes. It is applicable to SASL/PLAIN only.

Password

(Conditional)

Specifies the password used to authenticate with the Kafka cluster. Refer to the documentation of the chosen cloud provider for the correct syntax.

The parameter is shown when Authentication Required is set to Yes. The parameter is applicable to SASL/PLAIN only.

Considerations and limitations

There are several considerations to keep in mind when using the Subscribe to a Kafka Topic for JSON input connector:

  • Insufficiently managing and optimizing consumers will result in certain instances of the Subscribe to a Kafka Topic for JSON input connector not retrieving data. The number of consumers, in a consumer group, is limited by the number of partitions on a Kafka Topic. If the number of consumers in a consumer group exceeds the number of partitions available on a Kafka Topic, the excess consumers will not consume data. Consider optimizing the number of consumers to best align with the number of partitions on the Kafka Topic or implement different consumer groups for each connector to avoid this. For more information on consumers and consumer groups, see the Kafka Documentation.
  • The Subscribe to a Kafka Topic for JSON input connector is a client consumer of Kafka. Apply the same considerations to this input connector as would be required for any other client consumer of Kafka. For example, if this input connector is not receiving data from a Kafka Topic but a separate client consumer of Kafka is, consider the factors that are involved with having two client consumers. This includes but is not limited to the configured consumer group ID, number of partitions available on the Topic, and the number of existing consumers. Alternatively, if the input connector is stopped and started in quick succession, consider the implications for Kafka from a consumer point of view. A rebalancing of the Kafka Topic's partitions may prevent the input connector from immediately rejoining as a consumer under the same consumer group.