Skip To Content

Kafka Topic に接続して、JSON で受信

Kafka Topic に接続して、JSON で受信入力コネクタを使用すると、汎用 JSON として書式設定されたイベント データ レコードを Kafka Topic から取得して適合させることができます。 Apache Kafka の基本操作の詳細については、「Apache Kafka の概要」をご参照ください。

使用上の注意

Kafka Topic に接続して、JSON で受信入力コネクタを使用する際には、以下の点に注意してください。

  • この入力コネクタは、Kafka Topic から JSON データを受信する場合に使用します。 この入力コネクタは、Kafka の Consumer です。
  • この入力コネクタは、汎用 JSON インバウンド アダプターと Kafka インバウンド トランスポートを組み合わせます。
  • このアダプターは、フィーチャ JSON や GeoJSON とは異なり、汎用 JSON を解釈します。
  • 汎用 JSON レコードには、ジオメトリを表すデータを含める必要はありません。
  • アダプターは、単一の JSON レコードと、配列に整理された JSON レコードの両方を処理します。
  • アダプターでは、X、Y、Z 属性値からポイント ジオメトリを構築することができます。
  • この入力コネクタには [Learning Mode] パラメーターが含まれています。Learning Mode により、入力コネクタは構築したジオイベント定義を修正することができます。 このパラメーターの目的とは、受信したイベント データが可変的なスキーマやデータ構造を持つことを一時的に許可することです。 入力コネクタは、受信したデータ レコードのサンプルを使用して可変的なデータ構造について学習し、これまでに観測されなかった新しい属性フィールドを既存のジオイベント定義に付加します。
  • ジオイベント定義のリアルタイム変更を許可すると、ジオイベント サービスのリアルタイム解析の設計に悪影響が及ぶことがあります。 ベスト プラクティスとして、インバウンド イベント データでスキーマのばらつきが予期される場合は、Learning Mode をできる限り短い期間で使用し、インバウンド データで予期されるすべてのバリエーションをサポートするジオイベント定義を生成することをお勧めします。 次に、Learning Mode をオフにして、自動生成されたジオイベント定義をコピーし、運用環境に配置するために調整することができます。
  • Kafka インバウンド トランスポートは、Kafka クラスターまたは Broker で認証するために、TLS 1.2 および SASL セキュリティ プロトコルをサポートしています。

パラメーター

Kafka Topic に接続して、JSON で受信入力コネクタのパラメーターを次に示します。

パラメーター説明

名前

GeoEvent Manager で参照用として使用される入力コネクタの記述名。

カスタム Kafka プロパティでオーバーライド

デフォルトの GeoEvent Server Kafka クライアント プロパティをオーバーライドするかどうかを指定します。 デフォルトは [いいえ] です。

  • [はい] – トランスポートにより表示されるデフォルトの Kafka クライアント プロパティはオーバーライドされます。 有効な Kafka 構成が正しく書式設定された Kafka .properties ファイルを含む、GeoEvent Server に登録されたフォルダーを指定する必要があります。 サポートされている構成のリストと指定された .properties ファイルの推定される書式設定については、「Apache Kafka の構成」をご参照ください。
  • [いいえ] – トランスポートにより表示されるデフォルトの Kafka クライアント プロパティはオーバーライドされません。 [Kafka Bootstrap Servers][Consumer のグループ ID] を指定する必要があります。

Kafka Bootstrap Servers

(条件)

Kafka クラスターへの初期接続を確立するために使用される hostname:port ペアのリスト。 Hostname:port ペアは、カンマ区切りである必要があります (例: broker0.example.com:9092,broker1.example.com:9092,broker2.example.com:9092)。

このパラメーターは、[カスタム Kafka プロパティでオーバーライド][いいえ] に設定されている場合に表示されます。

Topic 名

対象データが使用される Kafka Topic 名または Kafka Topic のリスト。 複数の Topic を指定する際にはセミコロンで区切る必要があります。

  • topic1
  • topic1;topic2;topic3;topic4

注意:

複数の Kafka Topic を指定する機能は、ArcGIS GeoEvent Server 10.8 以降でサポートされます。

Consumer 数

Consumer グループごとの Consumer 数を指定します。 Consumer 数のデフォルトは 1 です。

注意:

Consumer 数は、Kafka トピックのパーティション数によって制限されます。 Consumer インスタンスの詳細については、「Apache Kafka の概要」をご参照ください。

Consumer のグループ ID

(条件)

一連の Consumer の Consumer グループを一意に識別するオプションの文字列。 これは Consumer グループ名とも呼ばれます。

Consumer のグループ ID を指定しない場合、GeoEvent Server は、geoevent-consumer と呼ばれる静的な Consumer のグループ ID を割り当てます。 この静的な Consumer のグループ ID は、Consumer のグループ ID が未指定のままにされている Kafka コネクタのすべてのインスタンスで共有されます。

カスタム Consumer グループ ID を指定することを強くお勧めします。 Consumer グループの詳細については、「Apache Kafka の概要」をご参照ください。

このパラメーターは、[カスタム Kafka プロパティでオーバーライド][いいえ] に設定されている場合に表示されます。

Kafka プロパティ ファイルの登録済みフォルダー

(条件)

Kafka .properties ファイルを含む GeoEvent Server に登録されたフォルダー。 Kafka .properties ファイルは、[カスタム Kafka プロパティでオーバーライド][はい] に設定されている場合にカスタム Kafka プロパティを定義します。 GeoEvent Server に登録されたフォルダーが、Kafka .properties ファイルが配置された場所への絶対パスであることを確認してください。

このパラメーターは、[カスタム Kafka プロパティでオーバーライド][はい] に設定されている場合に表示されます。

Kafka プロパティ ファイル名

(条件)

クライアント構成のカスタム Kafka プロパティを含む Kafka .properties ファイルの名前。 ファイルの名前は拡張子 .properties なしで指定する必要があります。

  • カスタム Kafka プロパティ ファイルの名前が「sample.properties」の場合、このパラメーターを「sample」と指定します。

このパラメーターは、[カスタム Kafka プロパティでオーバーライド][はい] に設定されている場合に表示されます。

最初から開始

レコードが最初のオフセットで始まる Topic から消費されるか、または Consumer にコミットした最後のオフセットから消費されるかを指定します。 デフォルトは [はい] です。

  • [はい] – レコードは最初のオフセットで始まる Topic から消費されます。
  • [いいえ] – レコードはコミットした最後のオフセットから消費されます。

注意:

オフセットの詳細については、「Apache Kafka の構成」をご参照ください。

JSON オブジェクト名

受信した JSON データ内の下部構造のルート ノードとして使用される JSON 要素の名前。 [JSON オブジェクト名] を使用して JSON 要素を名前で指定する場合、アダプターは、オブジェクト名が指定した要素名と一致する部分構造を検索します。 特定された部分構造内のデータだけが考慮されます。 この設定を空白のままにすると (デフォルト)、最上位の JSON オブジェクトが JSON 構造全体のルートとして使用されます。

ジオイベント定義の作成

インバウンド イベント データに新しいジオイベント定義を使用するか、既存のジオイベント定義を使用するかを指定します。 GeoEvent Server がインバウンド データ属性フィールドおよびデータ タイプを理解するには、ジオイベント定義が必要です。

  • [はい] – 最初に受信したイベント レコードのスキーマに基づき、新しいジオイベント定義が作成されます。
  • [いいえ] – 新しいジオイベント定義は作成されません。 インバウンド イベント データのスキーマに一致する、既存のジオイベント定義が選択されます。

ジオイベント定義名 (新規)

(条件)

新しいジオイベント定義に与えられる名前。 指定した名前を持つジオイベント定義がすでに存在する場合、既存のジオイベント定義が使用されます。 最初に受信したデータ レコードを使用し、後続のデータ レコードで予期されるスキーマを決定します。新しいジオイベント定義は、最初のデータ レコードのスキーマに基づいて作成されます。

このパラメーターは、[ジオイベント定義の作成][はい] に設定されている場合に表示され、[いいえ] に設定されている場合は非表示になります。

ジオイベント定義名 (既存)

(条件)

受信したデータを適合して、GeoEvent Service によって処理されるイベント データを作成する際に使用する、既存のジオイベント定義の名前。

このパラメーターは、[ジオイベント定義の作成][いいえ] に設定されている場合に表示され、[はい] に設定されている場合は非表示になります。

フィールドからジオメトリを作成

入力コネクタが、属性として受信した座標値を使用してポイント ジオメトリを作成するかどうかを指定します。 デフォルトは [いいえ] です。

  • [はい] – 指定したイベント属性の値を使用し、ポイント ジオメトリを作成します。
  • [いいえ] – ポイント ジオメトリは作成されません。 属性フィールドには、ジオメトリとして解釈される値が含まれているか、イベント レコードが非空間である (ジオメトリがない) と想定されます。

X ジオメトリ フィールド

(条件)

ポイント位置の X 座標部 (水平や経度など) を含むインバウンド イベント データの属性フィールド。

このパラメーターは、[フィールドからジオメトリを作成][はい] に設定されている場合に表示され、[いいえ] に設定されている場合は非表示になります。

Y ジオメトリ フィールド

(条件)

ポイント位置の Y 座標部 (垂直や緯度など) を含むインバウンド イベント データの属性フィールド。

このパラメーターは、[フィールドからジオメトリを作成][はい] に設定されている場合に表示され、[いいえ] に設定されている場合は非表示になります。

Z ジオメトリ フィールド

(条件)

ポイント位置の Z 座標部 (深さや高度など) を含むインバウンド イベント データのフィールド名。 空白のままにすると、Z 値が省略されて、2D ポイント ジオメトリが作成されます。

このパラメーターは、[フィールドからジオメトリを作成][はい] に設定されている場合に表示され、[いいえ] に設定されている場合は非表示になります。

ラーニング モード

[ラーニング モード] を有効にするか無効にするかを指定します。 [ラーニング モード][はい] に設定されている場合、インバウンド アダプターが作成し、維持しているジオイベント定義に新しいフィールドが付加されます。

  • [はい] – ラーニング モードが有効になります。 ジオイベント定義は、異なるスキーマを持つイベント レコードの新規フィールドで更新されます。
  • [いいえ] – ラーニング モードは有効になりません。 ジオイベント定義は修正されません。

[ラーニング モード] は、入力コネクタが構築したジオイベント定義を修正できるようにする場合に便利です。 このパラメーターの目的とは、受信したイベント データが可変的なスキーマやデータ構造を持つことを一時的に許可することです。 入力コネクタは、受信したデータ レコードのサンプルを使用して可変的なデータ構造について学習し、これまでに観測されなかった新しい属性フィールドを既存のジオイベント定義に付加します。

ジオイベント定義のリアルタイム変更を許可すると、ジオイベント サービスのリアルタイム解析の設計に悪影響が及ぶことがあります。 インバウンド イベント データでスキーマのばらつきが予期される場合は、[ラーニング モード] をできる限り短い期間で使用し、インバウンド データで予期されるすべてのバリエーションをサポートするジオイベント定義を生成することをお勧めします。 その後でラーニング モードをオフにし、自動生成されたジオイベント定義をコピーして、本番デプロイメント用に調整することができます。

デフォルト空間参照

座標値が、想定されている WGS84 地理座標系システムの緯度と経度の値ではない属性フィールド値をもとにジオメトリを作成するか、受信したジオメトリ文字列に空間参照が含まれない場合に使用される空間参照の Well-Known ID (WKID)。 WKID または Well-Known Text (WKT) を含む属性フィールドの WKT 値または名前が指定されることもあります。

推定される日付形式

日付時間値として予期される文字列表現を一致させ、それを Java Date 値に変換するために使用されるパターン。 パターンの形式は、Java SimpleDateFormat クラス変換に従います。

GeoEvent Server では、日付/時間値を ISO 8601 標準で表すことを推奨していますが、日付値として一般的に認識されている日付/時間値の文字列表現のいくつかは、[推定される日付形式] パターンを指定しなくても Java Date 値に変換することができます。 次に例を示します。

  • "2019-12-31T23:59:59" – ISO 8601 標準形式
  • 1577836799000 – Java Date (紀元からの時間を示す Long 型整数、UTC)
  • "Tue Dec 31 23:59:59 -0000 2019" – 一般的な Web サービス文字列形式
  • "12/31/2019 11:59:59 PM" – 米国で使用される一般的な形式 (12 時間形式)
  • "12/31/2019 23:59:59" – 米国で使用される一般的な形式 (24 時間形式)

受信した日付/時間の値が、上記の 5 つ以外の形式で表される場合は、GeoEvent Server が日付/時間値を適合する方法を把握できるよう、推定される日付形式パターンを指定する必要があります。

GeoJSON として

受信ジオメトリを、フィーチャ JSON ではなく GeoJSON ジオメトリ オブジェクトとして解析するかどうかを指定します。 デフォルトでは、ジオメトリを文字列として受信した場合、「ジオメトリ オブジェクト」で示すように、値はフィーチャ JSON になります。

  • [はい] – 受信ジオメトリはフィーチャ JSON ではなく GeoJSON として解析されます。
  • [いいえ] – 受信したジオメトリの文字列表現はフィーチャ JSON であると想定されます。

認証が必要

Kafka クラスターまたは Kafka Broker への接続に認証が必要かどうかを指定します。 デフォルトは [いいえ] です。

  • [はい]Kafka クラスターまたは Broker への接続に認証が必要です。
  • [いいえ]Kafka クラスターまたは Broker への接続に認証は不要です。

認証方法

(条件)

Kafka クラスターのセキュリティ保護に使用されるセキュリティ プロトコルを指定します。 利用できるセキュリティ プロトコルは TLS 1.2 および SASL です。

  • [TLS 1.2]Kafka クラスターが使用するセキュリティ プロトコルは TLS 1.2 です。 Kafka クラスターの PKI ファイル (x509 証明書) が、ArcGIS GeoEvent Server が構成されている ArcGIS Server のトラスト ストアにインポートされていることを確認してください。 証明書のインポートに関する具体的な手順については、「既存の CA 署名証明書での ArcGIS Server の構成」の「ArcGIS Server への証明書のインポート」セクションをご参照ください。
  • [SASL]Kafka クラスターによって使用されるセキュリティ プロトコルは、SASL です。 SASL、SSL、および Kerberos のみがサポートされています。

注意:

Kerberos を使用する場合、ArcGIS GeoEvent Server を実行しているオペレーティング システムのユーザー アカウントに、Kerberos セットアップ/構成内のキータブ ファイルへの読み取りアクセス権があることを確認してください。

このパラメーターは、[認証が必要][はい] に設定されている場合に表示されます。

認証情報ファイルの登録済みフォルダー

(条件)

Kafka クラスターの PKI ファイル (x509 証明書) を含む GeoEvent Server に登録されたフォルダー。 GeoEvent Server に登録されたフォルダーが、Kafka クラスターの証明書が配置された場所への絶対パスであることを確認してください。

このパラメーターは、[認証が必要][はい] に設定されている場合に表示されます。 これは TLS 1.2 にのみ適用されます。

認証情報の構成ファイル

(条件)

Kafka クラスターの PKI ファイル (x509 証明書) の名前。 この証明書とそれに関連するプライベート キーは、拡張子 .p12 または .pfx を持つファイルで表される PKCS#12 形式で保存する必要があります。 ファイルの名前と拡張子を指定します。

  • my_kafka_certificate.pfx
  • my_other_kafka_certificate.p12

注意:

このパラメーターでは、証明書ファイル名と拡張子のみがサポートされます。 証明書への相対パスはこのパラメーターに指定しないでください。 [認証情報ファイルの登録済みフォルダー] パラメーターを使用して証明書ファイルへの絶対パスを登録します。

このパラメーターは、[認証が必要][はい] に設定されている場合に表示されます。 これは TLS 1.2 にのみ適用されます。

キーストアのパスワード

(条件)

Kafka クラスターの PKI ファイル (x509 証明書) のパスワード。 これは証明書のプライベート キーとも呼ばれます。

このパラメーターは、[認証が必要][はい] に設定されている場合に表示されます。 このパラメーターは、TLS 1.2 にのみ適用されます。

SASL 認証タイプ

(条件)

Kafka クラスターがサポートする SASL 認証メカニズムのタイプを指定します。 利用可能な SASL 認証タイプには、SASL GSSAPI (Kerberos) と SASL PLAIN が含まれます。

  • [SASL/GSSAPI (Kerberos)]Kafka クラスターは SASL/GSSAPI Kerberos 認証を使用します。
  • [SASL/PLAIN]Kafka クラスターは SASL/PLAIN 認証を使用します。

このパラメーターは、[認証が必要][はい] に設定されている場合に表示されます。 これは SASL にのみ適用されます。

Kerberos プリンシパル

(条件)

特定ユーザーの Kerberos プリンシパル (例: GeoEventKafkaClient1@example.com)。

このパラメーターは、[認証が必要][はい] に設定されている場合に表示されます。 このパラメーターは、SASL/GSSAPI (Kerberos) にのみ適用されます。

キー タブの使用

(条件)

キータブを Kerberos 設定で使用するかどうかを指定します。 デフォルトは [はい] です。

  • [はい] – キータブは Kerberos 設定で使用されます。
  • [いいえ] – キータブは Kerberos 設定で使用されません。

このパラメーターは、[認証が必要][はい] に設定されている場合に表示されます。 これは SASL/GSSAPI (Kerberos) にのみ適用されます。

キーの保存

(条件)

キーを Kerberos 設定に保存するかどうかを指定します。 デフォルトは [はい] です。

  • [はい] – キーは Kerberos 設定に保存されます。
  • [いいえ] – キーは Kerberos 設定に保存されません。

このパラメーターは、[認証が必要][はい] に設定されている場合に表示されます。 これは SASL/GSSAPI (Kerberos) にのみ適用されます。

ユーザー名

(条件)

Kafka クラスターでの認証に使用されるユーザー名を指定します。 クラウド プロバイダーによっては、接続文字列とも呼ばれます。 正しい構文については選択したクラウド プロバイダーのドキュメントをご参照ください。

このパラメーターは、[認証が必要][はい] に設定されている場合に表示されます。 これは SASL/PLAIN にのみ適用されます。

パスワード

(条件)

Kafka クラスターでの認証に使用されるパスワードを指定します。 正しい構文については選択したクラウド プロバイダーのドキュメントをご参照ください。

このパラメーターは、[認証が必要][はい] に設定されている場合に表示されます。 このパラメーターは、SASL/PLAIN にのみ適用されます。

検討事項および制限事項

Kafka Topic に接続して、JSON で受信入力コネクタを使用する際には、以下の点に注意してください。

  • Consumer の管理と最適化が十分でないと、Kafka Topic に接続して、JSON で受信入力コネクタの特定のインスタンスでデータを取得できなくなります。 Consumer グループ内の Consumer 数は、Kafka トピックのパーティション数によって制限されます。 Consumer グループ内の Consumer 数が Kafka Topic のパーティション数を超えると、余った Consumer がデータを利用できなくなります。 これを防ぐには、Kafka Topic のパーティション数と最適に一致するように Consumer の数を最適化するか、コネクタごとに異なる Consumer グループを実装することを検討してください。 Consumer と Consumer グループの詳細については、「Kafka のドキュメント」をご参照ください。
  • Kafka Topic に接続して、JSON で受信入力コネクタは、Kafka のクライアント Consumer です。 Kafka の他のクライアント Consumer に必要な検討事項と同じ内容をこの入力コネクタに適用してください。 たとえば、この入力コネクタは Kafka Topic からデータを受信していないが Kafka の別のクライアント Consumer は受信している場合は、2 つのクライアント Consumer に関連する要因を検討します。 これには、構成された Consumer グループ ID、Topic で使用可能なパーティションの数、既存の Consumer の数が含まれますが、これらに限定されません。 また、この入力コネクタが立て続けに停止と開始を繰り返す場合は、Consumer の視点から Kafka の影響を検討します。 Kafka Topic のパーティションを再調整すると、この入力コネクタが同じ Consumer グループの Consumer としてすぐに再結合できなくなる可能性があります。