Kafka レシーバー 🔗
Kafkaレシーバーは、Splunk Distribution of OpenTelemetry Collectorが、Kafkaからメトリクスとログを(OTLP形式で)収集できるようにします。メッセージペイロードエンコーディングが設定可能です。サポートされるパイプラインタイプは、metrics
、logs
、traces
です。詳細は パイプラインでデータを処理する を参照してください。
注釈
すぐに使えるダッシュボードとナビゲーターは、まだKafkaレシーバーではサポートされていませんが、将来のリリースではサポートされる予定です。
はじめに 🔗
以下の手順に従って、コンポーネントの設定とアクティベーションを行ってください:
Splunk Distribution of the OpenTelemetry Collector をホストまたはコンテナプラットフォームにデプロイします:
次のセクションで説明するようにKafkaレシーバーを設定します。
Collector を再起動します。
サンプル構成 🔗
レシーバーをアクティブにするには、設定ファイルの kafka
セクションに receivers
を追加します:
receivers:
kafka:
protocol_version: 2.0.0
設定を完了するには、設定ファイルの service
セクションの1つ以上のパイプラインに、レシーバーを含めます。例:
service:
pipelines:
metrics:
receivers: [kafka]
主な設定 🔗
以下の設定が必要です:
protocol_version
。Kafkaプロトコルバージョン、たとえば2.0.0
。
以下の設定はオプションです:
brokers
。デフォルトではlocalhost:9092
です。Kafkaブローカーのリスト。resolve_canonical_bootstrap_servers_only
。デフォルトではfalse
です。起動時にブローカーIPを解決してから逆引きするかどうか。topic
。デフォルト:トレースはotlp_spans
、メトリクスはotlp_metrics
、ログはotlp_logs
。読み込み元のKafkaトピックの名前。1つのトピックには1つのテレメトリタイプしか使用できません。encoding
。デフォルトではotlp_proto
です。Kafkaから受信したペイロードのエンコーディング。以下のエンコーディングが利用可能です:otlp_proto
。ペイロードはそれぞれExportTraceServiceRequest
、ExportLogsServiceRequest
またはExportMetricsServiceRequest
にデシリアライズされます。jaeger_proto
。ペイロードは、単一のJaeger protoSpan
にデシリアライズされます。jaeger_json
。ペイロードは、jsonpb
を使用して、単一のJaeger JSON Spanにデシリアライズされます。zipkin_proto
。ペイロードはZipkinプロトスパンのリストにデシリアライズされます。zipkin_json
。ペイロードはZipkin V2 JSONスパンのリストにデシリアライズされます。zipkin_thrift
。ペイロードはZipkin Thriftスパンのリストにデシリアライズされます。raw
。ログのみ。ペイロードのバイトはログレコードの本文として挿入されます。text
。ログのみ。ペイロードはテキストとしてデコードされ、ログレコードの本文として挿入されます。デフォルトでは、デコードにUTF-8を使用します。この動作をカスタマイズするには、text_utf-8
やtext_shift_jis
などのtext_<ENCODING>
を使用できます。json
。ログのみ。ペイロードはJSONとしてデコードされ、ログレコードの本文として挿入されます。azure_resource_logs
。ログのみ。ペイロードはAzure Resource LogsフォーマットからOTLPに変換されます。
group_id
。デフォルトではotel-collector
です。受信者がメッセージを消費するコンシューマーグループ。client_id
。デフォルトではotel-collector
です。消費者クライアントID。initial_offset
。デフォルトではlatest
です。以前にコミットしたオフセットがない場合に使用する初期オフセット。指定可能な値はlatest
またはearliest
です。auth
。以下のオプションを使用して認証できます。plain_text
。以下のフィールドがあります:username
。使用するユーザー名。password
。使用するパスワード。
sasl
。以下のフィールドがあります:username
。使用するユーザー名。password
。使用するパスワード。mechanism
。使用するSASLメカニズム:SCRAM-SHA-256
、SCRAM-SHA-512
、AWS_MSK_IAM
またはPLAIN
。aws_msk.region
。AWS_MSK_IAM
を使用している場合は、AWSリージョン。aws_msk.broker_addr
。AWS_MSK_IAM
を使用する場合、MSKブローカーのアドレス。
tls
。以下のフィールドがあります:ca_file
。insecure
がfalse
に設定されている場合のみ使用します。CA証明書へのパス。クライアントでは、サーバー証明書を検証します。cert_file
。insecure
がfalse
に設定されている場合のみ使用します。TLSが必要な接続に使用するTLS認証へのパス。key_file
。insecure
がfalse
に設定されている場合のみ使用します。TLSが必要な接続に使用するTLSキーへのパス。insecure
。デフォルトではfalse
です。tls設定で、サーバーの証明書チェーンとホスト名、InsecureSkipVerify
の検証を無効にします。server_name_override
。バーチャルホスティングをサポートするためにクライアントが要求したサーバー名を示します。
kerberos
。以下のフィールドがあります:service_name
。Kerberosサービス名。realm
。Kerberosレルム。use_keytab
。true
の場合、パスワードの代わりにkeytabが使われます。username
。KDCでの認証に使用するKerberosユーザー名。password
。KDCでの認証に使用するKerberosパスワード。config_file
。Kerberos設定へのパス、例えば/etc/krb5.conf
。keytab_file
。keytabファイルへのパス、例えば/etc/security/kafka.keytab
。disable_fast_negotiation
。デフォルトではfalse
です。PA-FX-FASTネゴシエーション(Pre-Authentication Framework - Fast)を無効にします。一部の一般的なKerberos実装はPA-FX-FASTネゴシエーションをサポートしていません。
metadata
。以下のフィールドがあります:full
。デフォルトではtrue
です。メタデータ一式を保持するかどうか。無効にすると、クライアントは起動時にブローカーに最初のリクエストを行いません。retry
。以下のフィールドがあります:max
。デフォルトでは3
です。メタデータ取得の再試行回数。backoff
。デフォルトでは250ms
。メタデータのリトライの間隔。
autocommit
。以下のフィールドがあります:enable
。デフォルトではtrue
です。更新されたオフセットをブローカーに自動コミットするかどうか。interval
。デフォルトでは1s
です。更新されたオフセットをコミットする頻度。auto-commit
が有効になっていな場合、無効です。
message_marking
。以下のフィールドがあります:after
。デフォルトではfalse
です。true
の場合、メッセージはパイプラインが実行された後にマークされます。on_error
。デフォルトではfalse
です。false
の場合、正常に処理されたメッセージだけがマークされます。処理されたメッセージが恒久的なエラーを返した場合、パーティション全体がブロックされる可能性があることに注意してください。
header_extraction
。ヘッダーの抽出方法を決定します。以下のフィールドがあります:extract_headers
。デフォルトではfalse
です。true
の場合、ヘッダーフィールドはリソース属性にアタッチされます。headers
。デフォルトでは[]
。Kafkaレコードから抽出したいヘッダーのリスト。マッチするパターンはexact
。今のところ、正規表現はサポートされていません。
設定例:SASLとTLSを使用してKafkaに接続する 🔗
この例では、SASLとTLSを使用してKafkaに接続するレシーバーを設定する方法を示します:
receivers:
kafka:
auth:
sasl:
username: "user"
password: "secret"
mechanism: "SCRAM-SHA-512"
tls:
insecure: false
設定例:ヘッダーを抽出 🔗
この例では、レシーバーを設定してヘッダーを抽出する方法を示します:
receivers:
kafka:
topic: test
header_extraction:
extract_headers: true
headers: ["header1", "header2"]
レシーバーに次の test
をフィードした場合:
{
event: Hello,
headers: {
header1: value1,
header2: value2,
}
}
以下のログレコードを取得します。
{
...
body: Hello,
resource: {
kafka.header.header1: value1,
kafka.header.header2: value2,
},
...
}
以下が該当します:
Kafkaレコードヘッダー
header1
とheader2
がリソースの属性に追加されます。一致するKafkaヘッダーキーはすべて、
kafka.header
の文字列が先頭に付加され、リソースの属性に付加されます。
設定 🔗
次の表に、Kafkaレシーバーの設定オプションを示します:
トラブルシューティング 🔗
Splunk Observability Cloudをご利用のお客様で、Splunk Observability Cloudでデータを確認できない場合は、以下の方法でサポートを受けることができます。
Splunk Observability Cloudをご利用のお客様
Splunk サポートポータル でケースを送信する
Splunkサポート に連絡する
見込み客および無料トライアルユーザー様
Splunk Answers のコミュニティサポートで質問し、回答を得る
Join the Splunk #observability user group Slack channel to communicate with customers, partners, and Splunk employees worldwide. To join, see Chat groups in the Get Started with Splunk Community manual.