Cloud deploy

skaffold의 렌더링을통해 쿠베네티스 환경 배포 관리

skaffold 란?

skkafold 는 BUILD,DEPLOY,RENDERING 하여 배포를 관리

skaffold rendering이란?

  • manifest를 rendering하여 Kubernetes deployment 환경에게 전달
  • rendring의 역할은 yaml파일의 value 값을 치환해 전달

예시

deployment.yaml 의 컨테이너 이미지 app-image라는 값을 쿠베네티스 배포 환경에서 실제 도커 허브의 저장소 위치로 변환시켜준다.
따라서 rendering시 우리는 이미지 경로를 skaffold에게 이미지 경로를 parameter값으로 넘겨준다.


apiVersion: apps/v1
kind: Deployment
metadata:
  name: helloworld-app
spec:
  template:
    spec:
      containers:
        - name: hello-world-apps
          image: app-image # <---- replace

rendering시 helm, kustomize 관리도구를 사용할 수 있다.
https://skaffold.dev/docs/renderers/

Cloud deploy 란?

Cloud deploy는 파이프라인, 타겟으로 구성이 되어있어 skaffold를 이용하여 쿠베네티스 환경에 배포한다.

  • 타겟은 배포되야할 워커 노드풀을 정한다
  • 파이프라인은 배포 대상,배포 전략을 정한다
  • 파이프라인,타겟(cloud-deploy)를 구성 하면 1 : N의 관계로 릴리즈라는것을 생성할수 있다.
  • 릴리즈를 통해 롤백 및 배포 관리를 한다.
  • 파이프라인에 profiles(skaffold.yaml에 설정)을 구분하여 target에 배포한다.

젠킨스 비유

  • GCP cloud deploy 파이프라인 - 젠킨스 job launcher
  • GCP Cloud deploy 릴리즈 - 젠킨스 job 히스토리

파이프라인, 타겟 구성하기

  1. clouddeploy.yaml 정의
apiVersion: deploy.cloud.google.com/v1
kind: DeliveryPipeline
metadata:
 name:
 annotations:
 labels:
description: # 설명
suspended: #true: 릴리즈 재사용 금지 default: false
serialPipeline: # pipeline 정의 시작
 stages:
 - targetId: #아래 kind Target의 metadata.name 이 들어감
   profiles: [] # manifest profile 환경 skaffold -profile sandbox
# Deployment strategies
# One of:
#   standard:
#   canary:
# See the strategy section in this document for details.
   strategy:
     standard:
       verify: #배포 확인 skaffold.yaml verfiy스탠자 필요
       predeploy:
         actions: []
       postdeploy:
         actions: []
   deployParameters:
   - values:
     matchTargetLabels:
 - targetId:
   profiles: []
   strategy:
   deployParameters:
---

apiVersion: deploy.cloud.google.com/v1
kind: Target
metadata:
 name:
 annotations:
 labels:
description:
multiTarget:
 targetIds: []
deployParameters:
requireApproval:
#
# Runtimes
# one of the following runtimes:
gke:
 cluster:
 internalIp:
#
# or:
anthosCluster:
 membership:
#
# or:
run:
 location:
# (End runtimes. See documentation in this article for more details.)
#
executionConfigs:
- usages:
  - [RENDER | PREDEPLOY | DEPLOY | VERIFY | POSTDEPLOY]
  workerPool:
  serviceAccount:
  artifactStorage:
  executionTimeout:
  1. cloud deploy.yaml(pipeline, target) 배포 명령어 (https://cloud.google.com/sdk/gcloud/reference/deploy/apply)
    gcloud deploy apply --file=clouddeploy.yaml \ --region=asia-northeast3 \ --project={PROJECT_ID}

skaffold RENDERING 예제

  apiVersion: skaffold/v4beta7
  kind: Config
  metadata:
    name: helloworld-app
  profiles:
    - name: sandbox # cloud deploy stages profile과 동일시하기
      manifests:
        kustomize:
          paths:
            - overlays/sandbox
    - name: dev # cloud deploy stages profile과 동일시하기
      manifests:
        kustomize:
          paths:
            - overlays/dev
  manifests:
    kustomize:
      paths:
        - overlays/dev

  apiVersion: skaffold/v4beta6
  kind: Config
  manifests:
    helm:
      releases:
        - name: RELEASE_NAME
          chartPath: PATH_TO_HELM_CHART
          artifactOverrides:
            image: IMAGE_NAME

github action을 통한 Cloud deploy release 생성 명령어

cloud deploy create release-> cloud build -> skaffold를 통해 렌더링된 manifset -> 쿠베네티스 배포

      - name: Create release in Google Cloud Deploy
        run: |
          gcloud deploy releases create r-${{ inputs.service-name }}-api-${{ env.IMAGE_TAG }} \
                --skaffold-file ${{ inputs.skaffold-file-path }} \
                --delivery-pipeline ${{ inputs.delivery-pipeline }} \
                --to-target ${{ inputs.env }} \
                --region ${{ inputs.location }} \
                --annotations commitId=${{ github.sha }} \
                --images app-image=${{ inputs.base-app-image }}/${{ inputs.service-name }}/api:${{ env.IMAGE_TAG }}

참고자료

https://cloud.google.com/deploy/docs/using-skaffold?hl=ko#how_does_use_skaffold

Cloud Deploy에서 Skaffold는 어떻게 사용되나요?

  • 출시 버전을 만들 때 컨테이너 이미지에 대한 렌더링 소스 및 참조가 Cloud Deploy로 전달됩니다.
  • 출시 버전을 만들 때 Cloud Deploy는 Cloud Build를 호출하고, Cloud Build는 다시 skaffold diagnose 및 skaffold render를 호출해서 해당 렌더링 소스 및 이미지를 사용하여 매니페스트를 렌더링합니다.

python-replication-mysql는 Master에 최대한 부하를 주지 않고 데이터 분석, CDC를 가능하게 만드는 프로젝트이다.

국내기업에서는 카카오, 원티드 해외기업에서는 아마존에서 CDC로 사용되고있는 오픈소스 프로젝트이다.

 

개인적으로 아래 재밌게 본 영상인데 해당 DB장애도 BINLOG 기반으로 복구하였다.

python-replication-mysql도 Binlog 바탕으로 DDL, DML을 python 객체로 다시한번 wrapping 해주는 프로젝트로 보면된다.

그렇다면 같은 상황에서 python-replication-mysql로 DB를 복구해 나갈수 있었을까?

정답은 그때는 안된다. 하지만 오픈소스 기여로 이젠 할수있다.

1. 문제 원인

위와 같은 비슷한 예제이다

 

1) 내가 매일 9시에 새로운 테이블을 생성하고 이전 테이블을 Drop 시키는 일을 한다.
2) 10시에 이전 Drop된 테이블에 대한 정보를 Binlog기반으로 백업용 DB에 다시 넣는다.

 

2번을 작업할때 python-replication-mysql 오픈소스를 사용하면 문제가 발생한다.

원인은 현재 데이터베이스 테이블의 구조에 의지하기 때문이다.

자세히 설명하자면 TableMapEvent가 발생시 Binlog의 패킷을 통해 schema,table 이름의 값을 얻어올 수 있는데 이를 바탕으로 실제 Master DB에 아래와 같이 쿼리를 날린다.

 

SELECT
    COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME,
    COLUMN_COMMENT, COLUMN_TYPE, COLUMN_KEY, ORDINAL_POSITION,
    DATA_TYPE, CHARACTER_OCTET_LENGTH
FROM
    information_schema.columns
WHERE
    table_schema =":schema_name" and table_name = ":table_name"
ORDER BY ORDINAL_POSITION

 


따라서 Drop된 테이블에 대해서 현재 데이터베이스의 테이블이 존재하지 않기 때문에  DELETE,UPDATE,INSERT시 칼럼 맵핑을 하지못하는 치명적인 문제가 발생한다.

2. 해결방법

이벤트 시점의 테이블 칼럼정보를 흭득해야 한다.

  1. CDC 사용
    • Debezium으로 Kafka connect를 사용하여 DDL 변경건에 대해서 Consume한다.
    한계점
    • kafka에 의존적인 관계 생성
    • kafka consume이 매번 python-replication-mysql 애플리케이션 이벤트 받는시점보다 앞서서 받는것을 보장해야한다.
  2. Table Map Event의 optional metadata 사용 ✅
    • Mysql 8.0부터 optional metadata를 제공
    • optional metadata는 이벤트 발생시점의 칼럼 정보를 Binlog에 저장해서 전달해줌
    • binlog_row_metadata ='FULL' 변수 설정해야함

1번은 치명적인 문제가있었다.
kafka consumer는 Binlog의 이벤트가 들어오는 순서보다 무조건 먼저 들어와야된다는 것을 보장해야한다.
DDL 변경된 것이 더 늦는다면 Debezium을 붙이는 의미가 없기 때문에 2번으로 결정했다.


2번의 한계도 분명히 존재했는데 MYSQL 8.0 이전 사용자들은 binlog_row_metadata를 설정할 수 없다.

3. 개발 계획

개발을 진행하기 앞서 더이상 BINLOG의 패킷을 통해 들어온 정보들을 제외하고 실제 데이터베이스에 의존적이면 안된다고 생각했다.

위 SELECT에서 받아온 정보 DATA_TYPE, COLUMN_COMMENT, CHARACTER_OCTET_LENGTH이 당장의 이득이 될수 있어도 먼 미래를 생각한다면 다 제거하는게 옳다고 생각했다.

 

메인테이너에게 이 주제에 대해 얘기를 나누고 MYSQL 5.7 사용자들에게는 기존 0.44 버전까지만을 제공하자고 제안했다.

https://github.com/julien-duponchelle/python-mysql-replication/issues/473

 

How about remove information about self.column_schemas? · Issue #473 · julien-duponchelle/python-mysql-replication

What do you think about removing the information about self.column_schemas? Currently we are getting the value of Column schema from information_schema and using it for Column object information (d...

github.com


제안에 대한 답변

 

 


따라서 총 3가지 작업을 진행했다.

1. 현재 데이터베이스의 Column Schema 의존성 제거

2. OptionalMetaData 추출

3. 추출한 값을 토대로 이벤트 시점의 column 업데이트

 

4. 보충 설명

optional_meta_data는 event 발생시점의 Columns의 정보가 Binlog에 저장되어있다.

실제 Binlog 패킷을 파씽해서 데이터를 얻을 수 있다.

 

파씽 결과

 

=== OptionalMetaData ===
unsigned_column_list: [False, False]
default_charset_collation: None
charset_collation: {}
column_charset: []
column_name_list: ['id', 'json_data']
set_str_value_list : []
set_enum_str_value_list : []
geometry_type_list : []
simple_primary_key_list: [0]
primary_keys_with_prefix: {}
visibility_list: [True, True]
charset_collation_list: []
enum_and_set_collation_list: []

 

5. 마치며

위 이슈에 대해 같은 고민을 했던 오픈소스 사용자들의 반응도 흥미로웠다.
python-replication-mysql 이 0버전대가 아닌 1버전대로 올라가는 계기도 되었다.

 

 

 

Reference


 

MySQL: binary_log::Table_map_event::Optional_metadata_fields Struct Reference

Metadata_fields organizes m_optional_metadata into a structured format which is easy to access. More... Metadata_fields organizes m_optional_metadata into a structured format which is easy to access.

dev.mysql.com

 

TABLE_MAP_EVENT

Precedes each row operation event and maps a table definition to a number.

mariadb.com

 

카프카 컨수머를 공부하다보면 많은 비슷한 단어로 혼동이온다. 

인터넷에 찾아봐도 잘 나오지 않고 왜 이런 명칭이 나왔는지 배경과 코드를 보며 알아봤습니다.

 

 

1. ClientId

 

공식문서 

An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.

 

요청을 할 때 서버에 전달할 ID 문자열입니다.

이는 서버 측 요청 로깅에 논리적 애플리케이션 이름이 포함되도록 허용하여 단순한 IP/포트를 넘어 요청의 출처를 추적할 수 있도록 하기 위한 것입니다.

 

목적

브로커에 찍힌 컨수머의 로깅 추척 

 

배경

아래의  링크 Description을 보면 브로커에서 어느 컨슈머에서 요청을 한것인지 request log를 보는데 어느 컨수머인지 확인이 불편했습니다.

많은 컨수머 중 어떤 컨수머 그룹에서 요청을 보냈는지 알 수 없었던 것입니다. 따라서 ClientId가 컨수머 그룹 ID 정보까지 저장하는 형태로 변경하기로 했습니다.

 

kafka 관련 Issue

 

코드 level

 

기존 ClientId 할당

기존에는 ClientId 값을 Auto increment로 할당을 해주는것을 볼수있습니다.

변경된 ClientId 할당

변경된 코드 부분에서는 GroupId(컨수머 그룹) 값도 넣는것을 볼 수 있습니다.

ConsumerConfig로직

원래 ClientId 할당 로직이 KafkaConsumer 생성자에 있었지만 현재는 ConsumerConfig 클래스에서 책임을 갖도록 리팩토링되었습니다.

 


2. member.id

 

member.id에 대해 정확히 나온곳이없어서 카프카 이슈를 둘러보다가 발견했습니다.

 

A new consumer joins the group with member.id field set as UNKNOWN_MEMBER_ID (empty string), since it needs to receive the identity assignment from broker first. For request with unknown member id, broker will blindly accept the new join group request, store the member metadata and return a UUID to consumer.

 

새 Consumer는 브로커로부터 신원 할당을 먼저 받아야 하므로 member.id 필드가 UNKNOWN_MEMBER_ID(빈 문자열)로 설정된 상태로 그룹에 가입합니다. 알수없는 member.id 요청의 경우, 브로커는 새 그룹 가입 요청을 맹목적으로 수락하고 멤버 메타데이터를 저장한 후 소비자에게 UUID를 반환합니다.

 

목적

group Coordinator(broker 중 하나)과  컨수머 JoinGroup 요청시 고유 식별자 memberId를 보내 사용됩니다.

컨수머 하트비트 전송시에도 memberId를 보내 줍니다.

https://www.confluent.io/blog/apache-kafka-data-access-semantics-consumers-and-membership/

kafka 관련 Issue

 

코드 level

memberId는 컨수머 그룹에 등록됐을때 초기화 됩니다.

memberId 할당 부분을 봅시다.

 

GroupCoordinator 코드

memberId가 Consumer group 요청시 초기 memberId값은 UNKNOWN_MEMBER_ID이다.

그 이후 doNewMemberJoinGroup 로직으로 들어가게 됩니다.

 

doCurrentMemberJoinGroup 로직 부분은 4번 group.instance.id와 연관이있어 뒤에서 설명하겠습니다.

GroupCoordinator 코드

 

ClientId값과 UUID의 혼합값으로 할당합니다.


3. group.id

 

공식문서

 

A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy.

 

이 소비자가 속한 소비자 그룹을 식별하는 고유 문자열입니다. 이 속성은 소비자가 subscribe(topic)을 사용하여 그룹 관리 기능을 사용하거나 카프카 기반 오프셋 관리 전략을 사용하는 경우 필요합니다.

 

목적 

컨수머 토픽의 논리적 그룹

 

배경

구독하는 토픽에대해 컨수머들을 묶어서 관리해야하는데 필요한  KafkaConfig 설정입니다.

 


4. group.instance.id

 

공식문서

 

A unique identifier of the consumer instance provided by the end user. Only non-empty strings are permitted. If set, the consumer is treated as a static member, which means that only one instance with this ID is allowed in the consumer group at any time. This can be used in combination with a larger session timeout to avoid group rebalances caused by transient unavailability (e.g. process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior.

 

최종 사용자가 제공한 소비자 인스턴스의 고유 식별자입니다. 비어 있지 않은 문자열만 허용됩니다. 설정하면 소비자는 정적 멤버로 취급되며, 이는 소비자 그룹에서 언제든지 이 ID를 가진 인스턴스 하나만 허용됨을 의미합니다. 이는 일시적인 사용 불가(예: 프로세스 재시작)로 인한 그룹 재조정을 방지하기 위해 더 큰 세션 시간 초과와 함께 사용할 수 있습니다. 설정하지 않으면 소비자는 기존 동작인 동적 멤버로 그룹에 참여하게 됩니다.

 

목적

리밸런스시 파티션이 재분배 되지 않게하기 위한 설정

 

배경

2.3 버전 이후 생긴 개념입니다.

 

컨수머 그룹에 새로운 컨수머가 들어오거나 떠나게되면 파티션 리밸런스 현상이 발생합니다. 그 영향으로 컨수머 그룹 안 컨수머들은 모두 stop the world 현상에 빠지게됩니다. 그동안 브로커에는 토픽이 쌓여 들어온 데이터와 commit offset의 lag는 증가하게 될겁니다.

컨수머가 다운됐을때는 리밸런스가 무조건 일어나야하지만 내가 의도한 상황(ex. 재기동)일때 파티션이 다시 재분배 상태일때는 너무 큰 비용을 낭비하게됩니다.

이런 문제점을 해소하기 위해 정적 멤버라는 개념이 추가됐습니다.

KafkaConsumerConfig에 group.instance.id를 기입하면 정적 멤버라고 인식하게되어 컨수머 그룹 이탈시 LeaveGroup 호출을 하지 않게됩니다. 

정적 멤버가 떠나며 LeaveGroup을 호출하는 유일한 경우는 session.time.out으로 인해서만 발생하게됩니다.

 

 

kafka 관련 이슈

 

코드 level

GroupCoodinator 코드

여기서 instanceId(group.instance.id) 값의 유무를 통해 컨수머 그룹 JoinGroup시 정적멤버, 동적멤버 생성로직이 분기되어 관리되는것을 볼 수 있습니다.

 

GroupCoodinator 코드

정적멤버가 그룹에 재참여할때는 doCurrentMemberJoinGroup의 로직으로 관리되는것을 볼수 있습니다.

 

파티션 분배

빨간색은 동적멤버의 상황의 파티션 분배이고 초록색은 정적멤버의 파티션 분배로 동일하게 분배받은것을 볼수 있습니다.

 


정리

client.id, group.id, group.instance.id는 Config값으로 입력할 수 있고 

member.id는 값을 입력할수 없고 내부적으로 할당되어 브로커와 컨수머간 서로 통신을 위한 값이다.

 

카프카의 개념을 이해했다는 전제하에 작성한 글입니다. 


브로커의 데이터를 가져오기 위하여 보통 spring-kafka의 @kafkaListener 어노테이션을 등록하여 사용하거나 아래와 같이 poll 메서드를 작성하는 경우가 많습니다.
컨수머에 대한 아키텍처 구성에 대해서 알아보고 어떤 흐름을 통해 데이터를 컨숨할수 있을지 알아보려고 합니다.

public class Consumer {
    public void kafkaClient() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("group.id", "consumer");
        props.put("enable.auto.commit", "true");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("consumer"));
 
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %n\n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
            }
    }

 

 

Kafka는 용어가 많기 때문에 혼란이 되어서 키워드 별로 하는 일을 정확히 명시해야 이해하기 쉽습니다.

GroupCoordinator : 브로커 중 하나로 컨슈머 그룹의 메타데이터와 그룹을 관리
ConsumerCoordinator : HeartbeatThread 생성, 컨수머 그룹 파티션 재할당
ConsumerNetworkClient : 컨수머의 Request를 NetworkClient로 보내는 주체
NetworkClient : ConsumerNetworkClient에게 받아 브로커에게 통신을 보내는 주체
Fetcher : 브로커 노드에게 데이터 요청하는 주체
SubscriptionState : 컨수머의 구독상태 오프셋 상태 관리

 

Consumer는 데이터만을 가져오게 감쳐줘 있지만 사실 데이터를 가져오기 위해서는 위 정리한 키워드 GroupCoordinator(Broker 객체)를 제외하고 객체들이 필요합니다.

kafkaConsumer 객체 생성시 초기화되는 객체들

 

 

전체적인 큰 그림을 보기 위하여 Consumer가 ConsumerNetworkClient에게 Request를 보낼 것들이 뭐가 있을까 생각해봅시다.

 

컨수머는 GroupCoordinator의 정보에 의존하여 메타데이터를 받아옵니다.
브로커가 여러개있을 경우(=브로커의 서버가 여러대) 브로커 중 GroupCoordinator를 찾는 요청도 있을 겁니다.

기본적으로 우리의 첫 번째 목적인 브로커에 있는 데이터 요청 도 있을 겁니다.
또한 Consumer가 받아온 데이터를 읽었다는 처리 Flag(commit) 를 브로커에게 넘겨주는 요청도 있을 겁니다.

이외에도 사실 요청이 굉장히 많습니다.

 

위 얘기를 꺼낸 이유는 이 모든 요청을 보내기 위해
ConsumerNetworkClient의 unsent라는 Map<node,newClientRequest> 자료구조에 담아놓습니다.
노드별 원하는 Request를 설정하여 map에 쌓여있는 것들을 NetworkClient에게 보내고 Broker한테서 응답이 오면
NetworkClient에게 Response를 받고 원하는 데이터 형태를 셋팅하여 우리가 원하는 일들을 수행합니다.

 

 

들어가기 앞서 poll의 핵심 로직 코드만 설명하고있습니다. 참고해주세요

1.  Fetcher에게 Consumer에 할당된 Topic partition을 가져오라고 명령합니다.

Fetcher#sendFetches

sendFetches는 크게 4가지의 역할을 합니다

  1. 노드 별 FetchRequest 준비
  2. 노드별 FetchRequest.Builder Setup
  3. ConsumerNetworkClient에게 request 전송
  4. Future Reqeust 성공 실패 callback 메서드 주입

1번에서는 내가 읽는 Topic partition의 offset들을 어디까지 읽었고 그 이후부터 Broker에 저장되어있는 메시지의 offset을 가져갈거야라고 준비를 하는 단계입니다.

 

2번에서는 ConsumerNetworkclient에게 요청을 보낼 때 위에서 말한 여러 가지의 종류의 Request가 존재하기 때문에 Request의 목적은 FetchRequest를 전달이라는 것을 선언하는 줄입니다. (Request의 추상화)

 

3번에서는 실제 ConsumerNetworkClient에게 request를 보냅니다.

 

4번에서는 3번에서의 요청이 성공했을 때 실패했을 때를 나눠 callback메서드를 주입해줍니다.
(interface를 통한 익명 클래스 사용)

 

4번에 대한 추가 설명

더보기

 

RequestFutere의 interface

 

성공시 Callback메서드 주입

해당 Callback메서드가 호출되는 시점은 누군가에 의해 성공했다고 판단되었고 (여기서 누군가 = 나중에 나오겠지만  ConsumerNetworkClient의 자료구조 map에 담긴 pendingcompletion에서 꺼내어 판단합니다)

onSuccess의 매개변수에 ClientResponse가 담아져 있었을 겁니다.

 

따라서 1번 코드 부분에서는 브로커 메시지 정보가 담겨있는 응답에 대해 파티션에 대해 데이터를 정리하는 역할입니다.

2번 코드 부분에서는 completedFetches라는 곳에 실제 완료된 데이터를 담아둡니다.

 

결과적으로 Client(KafkaConsumer)가 poll을 호출했을 때 completedFetches의 데이터를 가져가 반환합니다. 


2. ConsumerNetworkClient의 send 요청을 보낸다.

위 1번의 3번 코드 부분이 이어서 진행되는 부분입니다.


ConsumerNetworkClient#send

ConsumerNetworkClinet#send

send는 크게 3가지의 역할을 합니다

  1. 미래의 응답 값이 처리된다면 불릴 Handler지정
  2. ClientRequest 객체 생성
  3. 노드별 ClientRequest를 unsent에 담기

 

1번의 코드를 2번의 코드 ClientRequet의 매개변수로 넣어주네요

지금은 아니지만 1번의  나중 역할은 보내는 Request가 성공, 실패했을 때 ConsumerNetworkClient의 pendingCompletion에 넣어두는 역할을 합니다.

(해당 부분은 뒤에 나올 NetworkClient의 poll 메서드 중 handleCompletedSend의 부분에서 처리되어 pendingCompletion에 들어갑니다.)

ConsumerNetworkClient는 pendingCompletion를 갖고 있습니다.
pendingCompletion 역할은 ClientRequest가 요청 실패했던 성공했던 일단 브로커로부터 응답이 온 Request들을 모아두고 있습니다

아래의 onComplete, onFailure이 pendingCompletion에 넣어주는 부분입니다.  (코드를 보시려면 더보기를 눌러주세요)

더보기
ConumerNetwrokClient#RequestFuturerㅇcompletionHandler

2번 코드를 다시 보면 많은 매개변수가 있지만 expectResponse: true라는 것을 한번 봅시다. 

우리는 이 요청이 브로커의 응답을 받아야만 하는 작업이면 true로 설정하고 요청만 보내고 끝나는 작업은 false로 표기하여 보냅니다. 

(아래 handleCompleteSends 부분에서 나옵니다)

 

 

3번 코드를 보면 위에서 처음 말한 내용이 기억나시나요?

ConsumerNetworkClient의 unsent라는 Map<node,newClientRequest> 자료구조에 담아놓습니다.

해당 부분 코드가 그 역할을 담당하고 있습니다.

unsent라는 곳에 노드별 원하는 request(그룹코디네이터찾기, 데이터가져오기,etc) 들을 담아서 가지고 있습니다.

 

unsent라는 곳에 담아서 데이터 통신을 하여 보내주는 역할은 KafkaConsumer의 pollForFetches라는 부분에서 호출됩니다.

 

갑자기 PollForFetches는 무슨 얘기지? 하시는 분들이 있으실 겁니다. 

1. Fetcher에게 Consumer에 할당된 Topic partition을 가져오라고 명령합니다. 
2. ConsumerNetworkClient의 send 요청을 보낸다.

이 부분이 호출되는 곳의 코드를 보면 KafkaConsumer의 pollForFethes라는 메서드가 있습니다.

정리해보면 위의 첫 번째, 두 번째 단계는  아래 1번 해당 코드 부분에서 호출되는 부분입니다.

sendFetches메서드에 노드별 ClientRequest 세팅을 해놓았고 위에서 unsent에 push 해놨습니다. (노드에게 ClientRequest 보내기전 준비를 마친상태)

KafkaConsumer#pollForFetches

 

2번 코드 부분에서 실제 ConsumerNetworkClient의 poll메서드를 호출시켜 unsent에 담겨있는 것을 실제 보내는 역할을 맡고 있습니다.

 

큰 목차 2. ConsumerNetworkClient의 send 요청을 보낸다.
메서드 이름이 send라 실제로 네트워크를 타고 보낸다고 해석할 수 있습니다. 하지만 여기선 노드별 ClientRequest를 준비하는 과정이라는 게 더 맞을 것 같습니다. 
추가적으로 2번 코드 부분의 client는 ConsumerNetworkClient 객체입니다.

큰 목차 1번,2번을 정리해보면 아래와 같은 그림이 되겠네요

 


3. ConsumerNetworkClient가 NetworkClient에게 실제 요청을 보내고 응답 값을 callback으로 completedFetches에 받는다.

poll은 크게 4가지의 역할을 합니다

  1. NetworkClient에게 ClientRequest 전송
  2. NetworkClient에게 응답값 가져오기
  3. ClientRequest를 보냈기 때문에 unsent 비우기 
  4. callbck 메서드 실행

 

그럼 이제 위 2번 코드 poll 메서드 안으로 들어가 봅시다.

 

ConsumerNetworkClient#poll

1번 코드에서는 unsent에 담아놓았던 거를 실제로 NetworkClient에게 보내는 요청이 들어갑니다.

(코드를 보려면 더보기를 눌러주세요)

더보기
ConsumerNetworkClient#trySend

여기서 client는 NetworkClient입니다. 

 해당 메서드를 타고 들어가면 doSend를 호출합니다.

NetworkClient#doSend

맨 아랫줄을 보면 실제로 selector를 통해서 통신하는 게 보이네요.

그 윗줄인 inFlightRequests를 봐봅시다.

 

NetworkClient는 브로커로 요청을 보냈는데 아직 응답이 오직 않은 것들을 inFlightRequests에 담아서 관리합니다.

현재 전송을 보냈기때문에 inFlightRequests에 담아줍니다.

2번 코드에서는 NetworkClient에게 요청을 보냈으니 NetworkClient의 poll을 호출하여 응답 값들을 받아오려고 합니다.

(코드를 보려면 더보기를 눌러주세요)

더보기
NetworkClient#poll

1번 코드는 selector를 통해 통신하니 실제 거기서도 응답 값을 가져오는 코드입니다.

 

2번 코드는브로커 요청 완료되어 있는 것을 확인해줄 수 있는 this.selector.completedSends()를 호출합니다.

해당 노드로 보낸 inFlightRequests의 가장 오래된 request를 확인하여 브로커의 응답이 필요하지 않은 것은 InflighRequests에서 빼는 것을 확인할 수 있습니다.

 

저희의 요청은 expectResponse: true이기 때문에 해당 if문으로 들어가지 않습니다.

 

3번 코드는 selector에게 수신받은 것에 대하여 응답 확인한 것에 대해  InflighRequests에서 빼는 것을 확인할 수 있습니다.

 4번 코드는 2,3번을 통해 response가 끝난 것들에 대해 onComplete()를 호출합니다.

기억나실지 모르지만 RequestFutureResponseHandler 기억나실까요?

다시 한번 사진을 가져오겠습니다.

ConumerNetwrokClient#RequestFuturerㅇcompletionHandler

여기서 위의 oncomplete()결과로 pendingCompletion에 추가가 됩니다.

현재 저희는 NetworkClient안에서 놀고 있지만  ConsumerNetworkClient에게도 이 응답사실을 전파해야 합니다.

이 역할을 pendingCompletion이 이어주고 있다고 생각하면 편합니다.

 3번 코드는 unsent에 담겨있던 ClientRequest는 다 전송했으니 비워줍니다. 

 4번 코드는 pendingCompletion에 넣어있는 것들을 꺼내서  callback메서드를 실행합니다.

ConsumerNetworkClient#firePendingCompletedRequests

 

아까는 RequestFutureCompletionHandler에 의해 pendingcompletion에 추가하였고
이번엔 pendingCompletion을 꺼내 pendingCompletion에 등록된 callback메서드를 fireCompletion으로 인해 호출되는것을 볼 수있습니다.

 

아래의 fireCompletion코드를 보면 future.complete(response)가 보이실 겁니다.

ConumerNetwrokClient#RequestFuturercompletionHandler

 

 

위에 작성한 글을 다시 가져와 보겠습니다.

해당 Callback메서드가 호출되는 시점은 누군가에 의해 성공했다고 판단되었고 onSuccess의 매개변수에 ClientResponse가 담아져 있었을 겁니다.

fireCompletion에 호출되는 동시에
등록된 response를 onSuccess(ClinetResponse resp)의 매개변수로 넘겨주는 거를 확인할 수 있습니다.

언제 이 callback을 주입했을까요? 맨 처음 부분의 Fetcher#sendFetches부분에서 넣어줬습니다.

 

성공시 Callback메서드 주입

이제 completedFetches에 응답 값 데이터를 넣어줄 수 있게 됐네요.

 

poll메서드 안에서 처음에 그리고 마지막 firePendingCompletedRequests() 작업이 두 번이나 호출하는것을 볼수 있습니다.
poll은 주기적으로 호출되는 메서드이기 때문에 전에 요청한 poll 부분이 이제 응답이 완료되어서  먼저 처리해주는 겁니다. 
그리고 try문안에서 네트워크 전송 요청이 끝났기 때문에 다시 한번 firePendingCompletedRequests를 호출하는 거죠

 

4.  KakaConsumer에게 실제 데이터 넘겨준다.

pollForFetches를 기억하시나요? 

여기서 return문을 봐봅시다.

fetcher.collectFetch() 메서드명만 봐도 우리가 담아놓은 completedFetches의 값을 가져가는 역할을 맡고 있습니다.

1번 코드에서는 completedFetches에서 어디부터 읽어오는지 확인하고 해당 코드를 들어가면 completedFetch에서 drain으로 값을 전달하고 있습니다. (코드를 보려면 더보기를 눌러주세요)

 

 

2번 코드에서는 drain해온 모든 데이터들을 return문에 담아 Client에게 전송합니다.

 

정리해보면 아래와 같은 그림이 되겠네요

 

마치며

이렇게 Client단까지 컨숨한 데이터를 전송하는지 알아봤습니다.

단순히 우리는 poll 한번 메서드를 호출하지만 내부를 보면 생각보다 복잡합니다.

그리고 각 컴포넌트 (ConsumerNetworkClient,NetworkClient)에게 응답 결과를 전 달해 주기 위해 callback을 사용하는 것을 보았습니다.

기회가 되면 회사에서  한 consumer 여러개의 broker의 여러 토픽을 consume 하는 코드를 작성했는데 소개하고싶네요 

 

 

 

+ Recent posts