KafkaConsumer.poll()(Kafka v0.10 で導入)と Kafka SimpleConsumer.fetch()(Kafka v0.9 で導入)を使用して、Apache Kafka コンシューマアクティビティのエントリポイントを検出するように、AppDynamics Java エージェントを構成できます。バージョン 0.11 より前では、Kafka ペイロードには相関データを格納する場所はないため、エンドツーエンド ビジネス トランザクション相関は、Kafka クライアントとブローカーが 0.11 以降の場合のみ可能です。

KafkaConsumer.poll

KafkaConsumer.poll() を使用して Kafka コンシューマ エントリ ポイントをインストゥルメント化するには、コンシューマがカスタムインターセプタ定義のループ内のメッセージを読み取るメソッドを特定します。各メッセージのビジネストランザクションを開始して終了するために、反復子の次のメソッドがインストゥルメント化されます。反復するメッセージに使用される反復子はさまざまなものが考えられますが、次のタイプの反復子のみがサポートされています。

  • kafka.consumer.ConsumerIterator
  • org.apache.kafka.clients.consumer.ConsumerRecords$ConcatenatedIterable$1
  1. Kafkaからメッセージを処理するループのクラスとメソッドを識別します。たとえば、以下のループを使用して Kafka からメッセージをポーリングして処理するクラス MyConsumer があるとします。

    ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
    pollMessages(records);
    private void pollMessages(ConsumerRecords<String, String>records) throws Exception {
       //AppDynamics instrumentation gets applied here
    for (ConsumerRecord<String, String> record : records) {
       //Processing of the records
       System.out.println(record.value());
       }
    }
    CODE

    この場合、次をインターセプトします。

    • クラス:MyConsumer
    • メソッド:pollMessages

    また、このインターセプタは、ループだけでなく個々のレコードを処理するメソッドにも適用できます。次に例を示します。

    ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
    pollMessages(records);
    private void pollMessages(ConsumerRecords<String, String>records) throws Exception {
       //AppDynamics instrumentation gets applied here
    for (ConsumerRecord<String, String> record : records) {
       processRecord(record)
       }
    }
    CODE
  2. 好みのテキストエディタを使用して、次のパスに custom-interceptors.xml という名前のファイルを作成して編集します。
    <agent_home>/<version_number>/conf 

    例:
    /usr/home/appdynamics/appagent/ver4.3.1.0/conf/custom-interceptors.xml
  3. 次の XML を custom-interceptors.xml にコピーします。

    <custom-interceptors>
        <custom-interceptor>
            <interceptor-class-name>com.singularity.KafkaMarkerMethodInterceptor</interceptor-class-name>
            <match-class type="matches-class">
                <name filter-type="equals">my-fully-qualified-class-name</name>
            </match-class>
            <match-method>
                <name>my-method-name</name>
            </match-method>
        </custom-interceptor>
    </custom-interceptors>
    XML
  4. クラス名の値をコンシューマクラスの名前に設定します。たとえば、MyConsumer クラスを指定するには以下のようにします。

    <match-class type="matches-class">
        <name filter-type="equals">com.mycompany.mypackage.MyConsumer</name>
    </match-class>
    XML
  5. メソッド名の値をメッセージ処理ループメソッドの名前に設定します。たとえば、pollMessages メソッドを指定するには、以下のようにします。

    <match-method>
        <name>pollMessages</name>
    </match-method>
    XML

    Javaエージェントは構成の更新を読み取ると、コンシューマアクティビティとアップストリームKafkaキューを検出します。アプリケーションフローマップには、Kafkaキューからデータを受信するティアが表示されます。


Kafka SimpleConsumerエントリポイント

SimpleConsumer.fetch() を使用してメッセージを取得する Kafka クライアントのコンシューマ エントリ ポイントを有効にするには、enable-kafka-consumer ノードプロパティを "true" 値で登録します。

この場合、Kafkaコンシューマのアクティビティが終了コールとして表示されます。