Dev. Blog isprometheo의 좌충우돌 개발 블로그
Posts with the tag kafka:

[Golang] Kafka 연동 문제

Go로 만든 프로젝트에서 kafka 연동하여 로그를 수집하는데 confluent-kafka-go를 사용하고 있었다. package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var kafkaClient *kafka.Producer func main() { kafkaClient, _ = kafka.NewProducer(&kafka.ConfigMap{ "bootstrap.servers": "broker:9092", "acks": 1, }) go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: m := ev if m.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v,

[Log] librdkafka 빌드하기

golang 프로젝트의 로그를 수집하기 위해 confluent-kafka-go를 이용했다. 이를 위해서 librdkafka를 설치해야 하는데 confluent-kafka-go 최신 버전의 경우 1.3.0 이상을 사용해야 한다는 에러가 났다. CeontOS 7에서 yum으로 설치하는

[Log] Logstash Kafka 연동 에러 해결

Lostash에서 input으로 kafka를 사용하고 있었는데 다음과 같은 오류가 나왔다. [WARN ][org.apache.kafka.common.utils.AppInfoParser] Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-1 at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) ~[?:?] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855) ~[?:?] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955) ~[?:?] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890) ~[?:?] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320) ~[?:?] at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:?] 중복되는 설정들이 많아 합치면서 pipeline을 사용하도록 수정했는

[Log] Fluentd Kafka 지연 해결

로그를 분석하기 위해 Fluentd, Kafka, ELK(Elastic Search, Logstash, Kibana)를 연동하여 사용하고 있는데 특정 토픽의 로그가 지연되고 있었다. 처음엔 Logstash에서 Elastic Search로 전송하여 인덱싱하는 게 느린 것으로 판단하여 Elastic Search의 설정을

[Log] Fluentd Kafka Logstash 연동

ELK를 이용하여 로그 수집/분석을 하기 위해 Fluentd와 Kafka를 연동할 필요가 있었다. 그래서 Fluentd에서 Kafka로 로그를 전송할 수 있도록 설정을 추가했는데 Kafka가 죽어서(원인은 파악하지 못했다