Code Visualizer

Visualizing ImportService with DOT Notation

This document illustrates the architecture of the `ImportService` class in TypeScript, showcasing its Kafka interactions, methods, and metrics through a DOT notation diagram for better comprehension and analysis.


Empty image or helper icon

Prompt

import { Inject, Injectable, OnModuleInit } from '@nestjs/common';
import {
    Batch,
    ConsumerConfig,
    ConsumerRunConfig,
    ConsumerSubscribeTopics,
    EachBatchPayload, KafkaMessage,
} from 'kafkajs';
import { KafkaService } from '@kafka/kafka.service';
import { DbBatch } from '@import/batches';
import { IMPORT_PARAMS_TOKEN } from '@import/constants';
import { DbBatchFactory } from './db-batch.factory';
import { KafkaValueDto } from '@import/dto';
import { NonRetriableException } from '@kafka/non-retriable.exception';
import { plainToInstance } from 'class-transformer';
import { Logger } from 'nestjs-pino';
import { PrometheusService } from '@r1-backend/prometheus';
import { Counter } from 'prom-client';

export type ImportServiceConfig = {
    topicsConfig: ConsumerSubscribeTopics;
    runConfig?: Omit;
    consumerConfig: ConsumerConfig;
}

@Injectable()
export class ImportService implements OnModuleInit {

    private static readonly MESSAGE_RECEIVED_COUNTER = 'message_received_total';

    private static readonly MESSAGE_PROCESSED_COUNTER = 'message_processed_total';

    private static readonly MESSAGE_FAILED_COUNTER = 'message_failed_total';

    private static readonly BATCH_RECEIVED_COUNTER = 'batch_received_total';

    private static readonly BATCH_PROCESSED_COUNTER = 'batch_processed_total';

    private static readonly BATCH_FAILED_COUNTER = 'batch_failed_total';

    private readonly messageReceivedCounter = new Counter({
        name: ImportService.MESSAGE_RECEIVED_COUNTER,
        help: 'Количество полученных сообщений из kafka',
        labelNames: ['topic', 'table', 'operation'],
    });

    private readonly messageProcessedCounter = new Counter({
                  name: ImportService.MESSAGE_PROCESSED_COUNTER,
                  help: 'Количество успешно обработанных сообщений из kafka',
                  labelNames: ['topic', 'table', 'operation'],
              });

    private readonly messageFailedCounter = new Counter({
                  name: ImportService.MESSAGE_FAILED_COUNTER,
                  help: 'Количество сообщений из kafka, которые не удалось обработать',
                  labelNames: ['topic', 'table', 'operation'],
              });

    private readonly batchReceivedCounter = new Counter({
                    name: ImportService.BATCH_RECEIVED_COUNTER,
                    help: 'Количество полученных пакетов[batch] из kafka',
                    labelNames: ['topic', 'table', 'operation'],
                });

    private readonly batchProcessedCounter = new Counter({
                    name: ImportService.BATCH_PROCESSED_COUNTER,
                    help: 'Количество успешно обработанных пакетов[batch] из kafka',
                    labelNames: ['topic', 'table', 'operation'],
                });

    private readonly batchFailedCounter = new Counter({
                    name: ImportService.BATCH_FAILED_COUNTER,
                    help: 'Количество пакетов[batch] из kafka, которые не удалось обработать',
                    labelNames: ['topic', 'table', 'operation'],
                });

    constructor(
        private readonly kafkaService: KafkaService,
        private readonly dbBatchFactory: DbBatchFactory,
        private readonly logger: Logger,
        @Inject(IMPORT_PARAMS_TOKEN) private readonly config: ImportServiceConfig,
        private readonly prometheusService: PrometheusService,
    ) {
        this.prometheusService.registerMetrics([
            this.messageReceivedCounter,
            this.messageProcessedCounter,
            this.messageFailedCounter,
            this.batchReceivedCounter,
            this.batchProcessedCounter,
            this.batchFailedCounter,
        ]);
    }

    onModuleInit(): Promise {
        const { topicsConfig, runConfig = {}, consumerConfig } = this.config;
        return this.kafkaService.consume(
            topicsConfig,
            {
                ...runConfig,
                eachBatch: this.processBatch.bind(this),
            },
            consumerConfig,
        );
    }

    async processBatch({
        batch,
        resolveOffset,
        isRunning,
        isStale,
        heartbeat,
    }: EachBatchPayload): Promise {

        const { topic } = batch;

        this.batchReceivedCounter.inc({ topic });
        let dbBatch: DbBatch | undefined;
        let lastSavedOffset: string | undefined;
        try {
            for (const message of batch.messages) {
                if (!isRunning() || isStale()) {
                    break;
                }

                const kafkaMessageDto = await this.parseKafkaMessage(message, batch);
                this.messageReceivedCounter.inc({
                    topic,
                    operation: kafkaMessageDto.operation,
                    table: kafkaMessageDto.table,
                });

                if (!dbBatch) {
                    try {
                        dbBatch = this.dbBatchFactory.createDbBatch(kafkaMessageDto);
                        this.messageProcessedCounter.inc({
                            topic,
                            operation: kafkaMessageDto.operation,
                            table: kafkaMessageDto.table,
                        });
                    } catch (e) {
                        this.messageFailedCounter.inc({
                            topic,
                            operation: kafkaMessageDto.operation,
                            table: kafkaMessageDto.table,
                        });
                        // eslint-disable-next-line max-depth
                        if (e instanceof NonRetriableException) {
                            this.logger.warn(e.message, {
                                topic,
                                partition: batch.partition,
                                offset: message.offset,
                            });
                        }

                        continue;
                    }
                }

                if (await dbBatch.add(kafkaMessageDto)) {
                    continue;
                }

                await dbBatch.commit();
                this.batchProcessedCounter.inc({
                    topic,
                    operation: kafkaMessageDto.operation,
                    table: kafkaMessageDto.table,
                });
                lastSavedOffset = message.offset;
                dbBatch = undefined;
                await heartbeat();
            }

            await dbBatch?.commit();
        } catch (e) {
            this.batchFailedCounter.inc({ topic });
            if (lastSavedOffset !== undefined) {
                resolveOffset(lastSavedOffset);
            }
            throw e;
        }
    }

    private async parseKafkaMessage(message: KafkaMessage, batch: Batch): Promise {
        const { value: rawValue } = message;
        if (rawValue === null) {
            throw new NonRetriableException('Kafka value is empty');
        }

        const value = await this.kafkaService.parseMessage(message, batch);

        return plainToInstance(KafkaValueDto, value, {
            strategy: 'excludeAll',
            exposeDefaultValues: true,
            enableImplicitConversion: true,
        });
    }

}

Answer

Code Visualization Using DOT Notation

Overview

This document provides a visual representation of the relationships within the provided TypeScript code, which defines an ImportService class using decorators and Kafka messaging concepts. The code is structured, showing how different components interact.

Dependencies and Imports

The code imports various dependencies necessary for Kafka operations and services:

  • NestJS Common: For dependency injection and module lifecycle callbacks.
  • KafkaJS: For Kafka message handling.
  • Other Services and DTOs: Custom services, constants, and data transfer objects.

Code Structure in DOT Notation

digraph ImportService {
    node [shape=box];

    subgraph cluster_import_service {
        label = "ImportService";
        style=filled;
        color=lightgrey;

        ImportService [label="ImportService"];
        OnModuleInit [label="OnModuleInit"];
        processBatch [label="processBatch"];
        parseKafkaMessage [label="parseKafkaMessage"];
        
        ImportService -> OnModuleInit [label="implements"];
        ImportService -> processBatch [label="contains"];
        ImportService -> parseKafkaMessage [label="contains"];

        messageReceivedCounter [label="messageReceivedCounter"];
        messageProcessedCounter [label="messageProcessedCounter"];
        messageFailedCounter [label="messageFailedCounter"];
        batchReceivedCounter [label="batchReceivedCounter"];
        batchProcessedCounter [label="batchProcessedCounter"];
        batchFailedCounter [label="batchFailedCounter"];

        ImportService -> messageReceivedCounter;
        ImportService -> messageProcessedCounter;
        ImportService -> messageFailedCounter;
        ImportService -> batchReceivedCounter;
        ImportService -> batchProcessedCounter;
        ImportService -> batchFailedCounter;

        KafkaService [label="KafkaService"];
        DbBatchFactory [label="DbBatchFactory"];
        Logger [label="Logger"];
        PrometheusService [label="PrometheusService"];
        IMPORT_PARAMS_TOKEN [label="IMPORT_PARAMS_TOKEN"];

        ImportService -> KafkaService;
        ImportService -> DbBatchFactory;
        ImportService -> Logger;
        ImportService -> PrometheusService;
        ImportService -> IMPORT_PARAMS_TOKEN;

        DbBatch [label="DbBatch"];
        NonRetriableException [label="NonRetriableException"];
        KafkaValueDto [label="KafkaValueDto"];

        processBatch -> DbBatch [label="uses"];
        processBatch -> NonRetriableException [label="throws"];
        parseKafkaMessage -> KafkaValueDto [label="returns"];
    }

    subgraph cluster_kafka {
        label = "Kafka Dependencies";
        style=dashed;

        KafkaMessage [label="KafkaMessage"];
        Batch [label="Batch"];
        EachBatchPayload [label="EachBatchPayload"];

        KafkaMessage -> EachBatchPayload [label="passed as"];
        Batch -> EachBatchPayload [label="used in"];
    }
}

Key Components

  1. ImportService: The main service class managing Kafka interactions.

    • Implements OnModuleInit to start the consumer on initialization.
    • Contains methods for processing message batches.
  2. Metrics Counters: Metrics for monitoring message and batch handling.

    • messageReceivedCounter, messageProcessedCounter, etc., track various states of message processing.
  3. Kafka Interaction:

    • The service uses KafkaService to consume topics, DbBatchFactory to create database batches, and utilizes Logger for logging warnings and errors.
  4. Error Handling:

    • Uses NonRetriableException to handle specific error cases during processing.

Summary

The code represents a structured NestJS service for interacting with Kafka, with clear delineation of responsibilities for processing messages and batches. The DOT representation outlines the relationships between class methods, metrics, and external dependencies, providing a visual overview of the code’s architecture.

For further in-depth training on such concepts, consider engaging with the courses available on the Enterprise DNA Platform.

Create your Thread using our flexible tools, share it with friends and colleagues.

Your current query will become the main foundation for the thread, which you can expand with other tools presented on our platform. We will help you choose tools so that your thread is structured and logically built.

Description

This document illustrates the architecture of the ImportService class in TypeScript, showcasing its Kafka interactions, methods, and metrics through a DOT notation diagram for better comprehension and analysis.