IT序号网

Java 发送信息到 kafka 的util

flyfish 2021年06月11日 编程语言 406 0

使用Java代码,发送数据到 kafka 去。

public class KafkaSendUtils { 
 
    private static Producer<String, String> producer; 
 
 
    /** 
     * 生产者,注意kafka生产者不能够从代码上生成主题,只有在服务器上用命令生成 
     *      
     */ 
    private synchronized static void initProperties(KafkaConfiguration setting) { 
        if (producer == null) { 
            Properties props = new Properties(); 
            props.put("bootstrap.servers", setting.getServers()); 
            props.put("acks", setting.getAcks()); 
            props.put("retries", setting.getRetries()); 
            props.put("batch.size", setting.getBatchSize()); 
            props.put("linger.ms", setting.getLingerMs()); 
            props.put("buffer.memory", setting.getBufferMemory()); 
            props.put("key.serializer", setting.getKeySerializer()); 
            props.put("value.serializer", setting.getValueSerializer()); 
            producer = new KafkaProducer<>(props); 
        } 
    } 
 
    /** 
     * 发送对象消息 至kafka上,调用json转化为json字符串,应为kafka存储的是String。 
     * 
     * @param jsonString json信息 
     */ 
    public static void sendMsgToKafka(String jsonString, KafkaConfiguration setting) { 
        initProperties(setting); 
        producer.send(new ProducerRecord<>(setting.getTopic(), String.valueOf(System.currentTimeMillis()), jsonString)); 
    } 
 
}

使用到配置信息(实现到接口,没啥特别到,就是起到一个标记到作用。)

/** 
 * 发数据到kafka的相关配置信息 
 * (@JSONField(ordinal = 1) 对象转json字符串时key按设置顺序输出) 
 * 
 * @author lxk on 2018/10/18 
 */ 
@Data 
public class KafkaConfiguration implements ConsumerConfigurationInstance { 
    /** 
     * kafka 服务器ip:端口号,集群用逗号分隔 
     * 例如:192.168.1.46:9092 
     */ 
    @JSONField() 
    private String servers; 
    /** 
     * 例如:all 
     */ 
    @JSONField(ordinal = 1) 
    private String acks; 
    /** 
     * 例如:0 
     */ 
    @JSONField(ordinal = 2) 
    private int retries; 
    /** 
     * 例如:16384 
     */ 
    @JSONField(ordinal = 3) 
    private int batchSize; 
    /** 
     * 例如:1 
     */ 
    @JSONField(ordinal = 4) 
    private int lingerMs; 
    /** 
     * 例如:33554432 
     */ 
    @JSONField(ordinal = 5) 
    private Long bufferMemory; 
    /** 
     * 例如:org.apache.kafka.common.serialization.StringSerializer 
     */ 
    @JSONField(ordinal = 6) 
    private String keySerializer; 
    /** 
     * 例如:org.apache.kafka.common.serialization.StringSerializer 
     */ 
    @JSONField(ordinal = 7) 
    private String valueSerializer; 
    /** 
     * kafka的topic 
     */ 
    @JSONField(ordinal = 8) 
    private String topic; 
 
    @Override 
    public String toString() { 
        return "KafkaConfiguration{" + 
                "servers='" + servers + '\'' + 
                ", acks='" + acks + '\'' + 
                ", retries=" + retries + 
                ", batchSize=" + batchSize + 
                ", lingerMs=" + lingerMs + 
                ", bufferMemory=" + bufferMemory + 
                ", keySerializer='" + keySerializer + '\'' + 
                ", valueSerializer='" + valueSerializer + '\'' + 
                ", topic='" + topic + '\'' + 
                '}'; 
    } 
 
    //例如: 
    //#发送 kafka 配置信息 
    //kafka.servers = 192.168.1.46:9092 
    //kafka.acks = all 
    //kafka.retries = 0 
    //kafka.batch.size = 16384 
    //kafka.linger.ms = 1 
    //kafka.buffer.memory = 33554432 
    //kafka.key.serializer = org.apache.kafka.common.serialization.StringSerializer 
    //kafka.value.serializer = org.apache.kafka.common.serialization.StringSerializer 
}

这个信息发送到kafka之后,kafka里面的数据只能被消费一次,消费之后,他就不见了。所以,你在看你的数据有没有发送到kafka到时候,注意下,你使用到查看方式要是消费到方式的话,那看一下,数据就没有了,第二次再看的时候,数据就不在了,可不是你数据没发送成功。

代码简单,就是记录下笔记吧,万一啥时候再用到呢,可以,直接拿来用。

我写完文章,给自己点个赞,不过分吧,
不过分,那我可就点啦啊。
我先点为敬,你们随意。大家随意。不要客气。。。


评论关闭
IT序号网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!