An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.
요청을 할 때 서버에 전달할 ID 문자열입니다.
이는 서버 측 요청 로깅에 논리적 애플리케이션 이름이 포함되도록 허용하여 단순한 IP/포트를 넘어 요청의 출처를 추적할 수 있도록 하기 위한 것입니다.
목적
브로커에 찍힌 컨수머의 로깅 추척
배경
아래의 링크 Description을 보면 브로커에서 어느 컨슈머에서 요청을 한것인지 request log를 보는데 어느 컨수머인지 확인이 불편했습니다.
많은 컨수머 중 어떤 컨수머 그룹에서 요청을 보냈는지 알 수 없었던 것입니다. 따라서 ClientId가 컨수머 그룹 ID 정보까지 저장하는 형태로 변경하기로 했습니다.
기존에는 ClientId 값을 Auto increment로 할당을 해주는것을 볼수있습니다.
변경된 코드 부분에서는 GroupId(컨수머 그룹) 값도 넣는것을 볼 수 있습니다.
원래 ClientId 할당 로직이 KafkaConsumer 생성자에 있었지만 현재는 ConsumerConfig 클래스에서 책임을 갖도록 리팩토링되었습니다.
2. member.id
member.id에 대해 정확히 나온곳이없어서 카프카 이슈를 둘러보다가 발견했습니다.
A new consumer joins the group with member.id field set as UNKNOWN_MEMBER_ID (empty string), since it needs to receive the identity assignment from broker first. For request with unknown member id, broker will blindly accept the new join group request, store the member metadata and return a UUID to consumer.
새 Consumer는 브로커로부터 신원 할당을 먼저 받아야 하므로 member.id 필드가 UNKNOWN_MEMBER_ID(빈 문자열)로 설정된 상태로 그룹에 가입합니다. 알수없는 member.id 요청의 경우, 브로커는 새 그룹 가입 요청을 맹목적으로 수락하고 멤버 메타데이터를 저장한 후 소비자에게 UUID를 반환합니다.
목적
group Coordinator(broker 중 하나)과 컨수머 JoinGroup 요청시 고유 식별자 memberId를 보내 사용됩니다.
A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy.
이 소비자가 속한 소비자 그룹을 식별하는 고유 문자열입니다. 이 속성은 소비자가 subscribe(topic)을 사용하여 그룹 관리 기능을 사용하거나 카프카 기반 오프셋 관리 전략을 사용하는 경우 필요합니다.
목적
컨수머 토픽의 논리적 그룹
배경
구독하는 토픽에대해 컨수머들을 묶어서 관리해야하는데 필요한 KafkaConfig 설정입니다.
A unique identifier of the consumer instance provided by the end user. Only non-empty strings are permitted. If set, the consumer is treated as a static member, which means that only one instance with this ID is allowed in the consumer group at any time. This can be used in combination with a larger session timeout to avoid group rebalances caused by transient unavailability (e.g. process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior.
최종 사용자가 제공한 소비자 인스턴스의 고유 식별자입니다. 비어 있지 않은 문자열만 허용됩니다. 설정하면 소비자는 정적 멤버로 취급되며, 이는 소비자 그룹에서 언제든지 이 ID를 가진 인스턴스 하나만 허용됨을 의미합니다. 이는 일시적인 사용 불가(예: 프로세스 재시작)로 인한 그룹 재조정을 방지하기 위해 더 큰 세션 시간 초과와 함께 사용할 수 있습니다. 설정하지 않으면 소비자는 기존 동작인 동적 멤버로 그룹에 참여하게 됩니다.
목적
리밸런스시 파티션이 재분배 되지 않게하기 위한 설정
배경
2.3 버전 이후 생긴 개념입니다.
컨수머 그룹에 새로운 컨수머가 들어오거나 떠나게되면 파티션 리밸런스 현상이 발생합니다. 그 영향으로 컨수머 그룹 안 컨수머들은 모두 stop the world 현상에 빠지게됩니다. 그동안 브로커에는 토픽이 쌓여 들어온 데이터와 commit offset의 lag는 증가하게 될겁니다.
컨수머가 다운됐을때는 리밸런스가 무조건 일어나야하지만 내가 의도한 상황(ex. 재기동)일때 파티션이 다시 재분배 상태일때는 너무 큰 비용을 낭비하게됩니다.
이런 문제점을 해소하기 위해 정적 멤버라는 개념이 추가됐습니다.
KafkaConsumerConfig에 group.instance.id를 기입하면 정적 멤버라고 인식하게되어 컨수머 그룹 이탈시 LeaveGroup 호출을 하지 않게됩니다.
정적 멤버가 떠나며 LeaveGroup을 호출하는 유일한 경우는 session.time.out으로 인해서만 발생하게됩니다.
브로커의 데이터를 가져오기 위하여 보통 spring-kafka의@kafkaListener 어노테이션을 등록하여 사용하거나 아래와 같이 poll 메서드를 작성하는 경우가 많습니다. 컨수머에 대한아키텍처 구성에 대해서 알아보고 어떤 흐름을 통해 데이터를컨숨할수 있을지 알아보려고 합니다.
public class Consumer {
public void kafkaClient() {
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("group.id", "consumer");
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("consumer"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %n\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
}
Kafka는 용어가 많기 때문에 혼란이 되어서 키워드 별로 하는 일을 정확히 명시해야 이해하기 쉽습니다.
GroupCoordinator : 브로커 중 하나로 컨슈머 그룹의 메타데이터와 그룹을 관리 ConsumerCoordinator : HeartbeatThread 생성, 컨수머 그룹 파티션 재할당 ConsumerNetworkClient : 컨수머의 Request를 NetworkClient로 보내는 주체 NetworkClient : ConsumerNetworkClient에게 받아 브로커에게 통신을 보내는 주체 Fetcher : 브로커 노드에게 데이터 요청하는 주체 SubscriptionState : 컨수머의 구독상태 오프셋 상태 관리
Consumer는 데이터만을 가져오게 감쳐줘 있지만 사실 데이터를 가져오기 위해서는 위 정리한 키워드 GroupCoordinator(Broker 객체)를 제외하고 객체들이 필요합니다.
전체적인 큰 그림을 보기 위하여 Consumer가 ConsumerNetworkClient에게 Request를 보낼 것들이 뭐가 있을까 생각해봅시다.
컨수머는 GroupCoordinator의 정보에 의존하여 메타데이터를 받아옵니다. 브로커가 여러개있을 경우(=브로커의 서버가 여러대) 브로커 중 GroupCoordinator를 찾는 요청도 있을 겁니다.
기본적으로 우리의 첫 번째 목적인브로커에 있는 데이터 요청도 있을 겁니다. 또한 Consumer가 받아온데이터를 읽었다는 처리 Flag(commit) 를 브로커에게 넘겨주는 요청도 있을 겁니다.
이외에도 사실 요청이 굉장히 많습니다.
위 얘기를 꺼낸 이유는 이 모든 요청을 보내기 위해 ConsumerNetworkClient의 unsent라는Map<node,newClientRequest>자료구조에 담아놓습니다. 노드별 원하는 Request를 설정하여 map에 쌓여있는 것들을NetworkClient에게 보내고Broker한테서 응답이 오면 NetworkClient에게 Response를 받고원하는 데이터 형태를 셋팅하여 우리가 원하는 일들을 수행합니다.
1번 코드는 selector를 통해 통신하니 실제 거기서도 응답 값을 가져오는 코드입니다.
2번 코드는브로커 요청 완료되어 있는 것을 확인해줄 수 있는 this.selector.completedSends()를 호출합니다.
해당 노드로 보낸 inFlightRequests의 가장 오래된 request를 확인하여 브로커의 응답이 필요하지 않은 것은 InflighRequests에서 빼는 것을 확인할 수 있습니다.
저희의 요청은 expectResponse: true이기 때문에 해당 if문으로 들어가지 않습니다.
3번 코드는 selector에게 수신받은 것에 대하여 응답 확인한 것에 대해 InflighRequests에서 빼는 것을 확인할 수 있습니다.
4번 코드는 2,3번을 통해 response가 끝난 것들에 대해 onComplete()를 호출합니다.
기억나실지 모르지만 RequestFutureResponseHandler 기억나실까요?
다시 한번 사진을 가져오겠습니다.
여기서 위의 oncomplete()결과로 pendingCompletion에 추가가 됩니다.
현재 저희는 NetworkClient안에서 놀고 있지만 ConsumerNetworkClient에게도 이 응답사실을 전파해야 합니다.
이 역할을 pendingCompletion이 이어주고 있다고 생각하면 편합니다.
3번 코드는 unsent에 담겨있던 ClientRequest는 다 전송했으니 비워줍니다.
4번 코드는 pendingCompletion에 넣어있는 것들을 꺼내서 callback메서드를 실행합니다.
아까는 RequestFutureCompletionHandler에 의해 pendingcompletion에 추가하였고 이번엔 pendingCompletion을 꺼내 pendingCompletion에 등록된 callback메서드를 fireCompletion으로 인해 호출되는것을 볼 수있습니다.
아래의 fireCompletion코드를 보면 future.complete(response)가 보이실 겁니다.
위에 작성한 글을 다시 가져와 보겠습니다.
해당 Callback메서드가 호출되는 시점은 누군가에 의해 성공했다고 판단되었고 onSuccess의 매개변수에 ClientResponse가 담아져 있었을 겁니다.
fireCompletion에 호출되는 동시에 등록된 response를 onSuccess(ClinetResponse resp)의 매개변수로 넘겨주는 거를 확인할 수 있습니다.
언제 이 callback을 주입했을까요? 맨 처음 부분의 Fetcher#sendFetches부분에서 넣어줬습니다.
이제 completedFetches에 응답 값 데이터를 넣어줄 수 있게 됐네요.
poll메서드 안에서 처음에 그리고 마지막 firePendingCompletedRequests() 작업이 두 번이나 호출하는것을 볼수 있습니다. poll은 주기적으로 호출되는 메서드이기 때문에 전에 요청한 poll 부분이 이제 응답이 완료되어서 먼저 처리해주는 겁니다. 그리고 try문안에서 네트워크 전송 요청이 끝났기 때문에 다시 한번 firePendingCompletedRequests를 호출하는 거죠
4. KakaConsumer에게 실제 데이터 넘겨준다.
pollForFetches를 기억하시나요?
여기서 return문을 봐봅시다.
fetcher.collectFetch() 메서드명만 봐도 우리가 담아놓은 completedFetches의 값을 가져가는 역할을 맡고 있습니다.
1번 코드에서는 completedFetches에서 어디부터 읽어오는지 확인하고 해당 코드를 들어가면 completedFetch에서 drain으로 값을 전달하고 있습니다. (코드를 보려면 더보기를 눌러주세요)