산업기술
Apache Kafka는 데이터 수집, 저장, 처리 및 재배포를 위한 프레임워크입니다. 현재 전 세계 기업에 널리 보급되어 있습니다. Kafka의 공식 웹사이트는 아이디어와 배포 방법에 대한 자세한 정보를 제공합니다. 주요 기능 중 하나는 MQTT와 같은 다른 애플리케이션 및 통신 프로토콜에 대한 수많은 기존 커넥터입니다.
MQTT는 견고성과 작은 설치 공간으로 인해 IoT 통신에 자주 사용되는 경량 TCP 기반 메시징 프로토콜입니다. OASIS 표준 MQTT에 대한 자세한 내용은 웹사이트에서 확인할 수 있습니다.
여기에서 Eclipse의 MQTT 구현인 PLCnext용 mosquitto를 교차 컴파일하는 방법에 대한 Makers Blog 기사를 찾을 수 있습니다. 또는 PLCnext Store에서 준비된 MQTT 앱을 제공합니다.
다음 그림은 PLCnext 컨트롤에서 Kafka로 데이터를 수집하기 위해 구현하려는 설정의 개요를 보여줍니다. Kafka 버전(2)에 Confluent의 MQTT 프록시를 사용할 수 있지만 보다 일반적인 솔루션(1)에 중점을 둘 것입니다. 클라이언트가 메시지에 연결하고 메시지를 게시하는 MQTT 브로커와 브로커에서 주제를 구독하고 메시지를 처리하고 Kafka로 전달하는 커넥터로 구성됩니다.
이 자습서에서 커넥터는 MIT 라이선스(자세한 라이선스 정보)에 따라 라이선스가 부여된 GitHub의 evokly/kafka-connect-mqtt 리포지토리를 기반으로 합니다. 먼저 저장소를 다운로드하고 추출합니다. 최신 저장소 버전이 2016년 말이므로 build.gradle
을 업데이트합니다. 파일, 이전 종속성을 새 버전으로 교체:
ext { kafkaVersion = '2.6.0' }
...
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.13'
compile "org.apache.kafka:connect-api:$kafkaVersion"
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
compile 'org.bouncycastle:bcprov-jdk15on:1.67'
compile 'org.bouncycastle:bcpkix-jdk15on:1.67'
compile 'org.bouncycastle:bcpg-jdk15on:1.67'
compile 'commons-io:commons-io:2.8.0'
compile 'org.slf4j:slf4j-api:1.7.30'
testCompile 'org.slf4j:slf4j-simple:1.7.30'
}
이 예에서는 일반 문자열 메시지를 Kafka에 보냅니다. 따라서 Java 클래스 DumbProcessor.java
를 편집해야 합니다. /kafka-connect-mqtt-master/src/main/java/com/evokly/kafka/connect/mqtt
폴더에서 , 기본 메시지 프로세서:
@Override
public SourceRecord[] getRecords(String kafkaTopic) {
return new SourceRecord[]{new SourceRecord(null, //sourcePartition
null, //sourceOffset
kafkaTopic, //topic
null, //partition
null, //keySchema
mTopic, //key
null, //valueSchema
mMessage.toString(), //value
new Long(123L))}; //long timestamp
}
그런 다음 종속성을 포함하는 Java 아카이브 파일(JAR)을 빌드합니다. ./gradlew clean jar
. 출력 JAR kafka-connect-mqtt-1.1-SNAPSHOT.jar
을 복사합니다. /kafka-connect-mqtt-master/build/libs
폴더에서 찾을 수 있습니다. libs
으로 카프카 디렉토리.
Kafka의 libs 디렉토리에 있는 org.eclipse.paho.client.mqttv3-1.2.5.jar 아카이브 사본도 필요합니다. 여기에서 다운로드할 수 있습니다.
또한 mqtt.properties
커넥터에 대한 구성 파일을 만들어야 합니다. Kafka의 config
에서 폴더. 파일 내용은 다음과 같습니다.
name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1
# converters for plain String messages without schemas
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
kafka.topic=test_in # Kafka destination topic for the MQTT messages
mqtt.client_id=mqtt-kafka-123
mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60
mqtt.server_uris=tcp://172.17.0.1:1883 # address of the MQTT broker
mqtt.topic=test/# # MQTT topic where the messages should be collected
#if we want to use our own processor class
#message_processor_class=com.evokly.kafka.connect.mqtt.sample.OwnProcessor
이제 커넥터를 로컬에서 테스트할 수 있습니다. Kafka의 디렉토리로 이동하여 ZooKeeper 및 Broker 인스턴스를 시작합니다.
# start ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# start Kafka:
bin/kafka-server-start.sh config/server.properties
# start an MQTT-Broker (here a mosquitto docker container)
sudo docker run -d --name mosquitto -p 1883:1883 eclipse-mosquitto
# start the MQTT-Kafka connector
bin/connect-standalone.sh config/connect-standalone.properties config/mqtt.properties
# start a Kafka console consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_in --from-beginning --property print.value=true --property print.timestamp=true
# publish an MQTT message
mosquitto_pub -h 172.17.0.1 -p 1883 -t test/1 -m test123
메시지가 콘솔 소비자에 표시됩니다.
산업기술
기술적 배경 카프카 Apache Kafka는 데이터 수집, 저장, 처리 및 재배포를 위한 프레임워크입니다. 현재 전 세계 기업에 널리 보급되어 있습니다. Kafka의 공식 웹사이트는 아이디어와 배포 방법에 대한 자세한 정보를 제공합니다. 주요 기능 중 하나는 MQTT와 같은 다른 애플리케이션 및 통신 프로토콜에 대한 수많은 기존 커넥터입니다. MQTT MQTT는 견고성과 작은 설치 공간으로 인해 IoT 통신에 자주 사용되는 경량 TCP 기반 메시징 프로토콜입니다. OASIS 표준 MQTT에 대한 자세한 내용은 웹사이트에서 확인할
PLCnext 기능 앱 easymon 컨트롤의 원격 모니터링을 설정하는 번거로움을 없애줍니다. 클라우드 기반 IoT 서비스의 모든 핵심 부분을 구성하는 대신 모바일 장치에서 모니터링하려는 PLCnext 프로젝트 변수를 선택하고 알림 임계값을 설정하기만 하면 됩니다. 이 구성은 명시적으로 선택한 변수에 대한 데이터만 클라우드로 전파하기 위해 PLCnext에서 실행되는 easymon 구성기에서 발생합니다. PLCnext 저장소는 연결된 PLCnext 컨트롤러에서 기능 확장의 설치 및 관리를 용이하게 하는 데 큰 역할을 합니다.