산업 제조
산업용 사물 인터넷 | 산업자재 | 장비 유지 보수 및 수리 | 산업 프로그래밍 |
home  MfgRobots >> 산업 제조 >  >> Manufacturing Technology >> 산업기술

MQTT를 통해 Apache Kafka에 PLCnext Control 연결

기술적 배경

카프카

Apache Kafka는 데이터 수집, 저장, 처리 및 재배포를 위한 프레임워크입니다. 현재 전 세계 기업에 널리 보급되어 있습니다. Kafka의 공식 웹사이트는 아이디어와 배포 방법에 대한 자세한 정보를 제공합니다. 주요 기능 중 하나는 MQTT와 같은 다른 애플리케이션 및 통신 프로토콜에 대한 수많은 기존 커넥터입니다.

MQTT

MQTT는 견고성과 작은 설치 공간으로 인해 IoT 통신에 자주 사용되는 경량 TCP 기반 메시징 프로토콜입니다. OASIS 표준 MQTT에 대한 자세한 내용은 웹사이트에서 확인할 수 있습니다.

여기에서 Eclipse의 MQTT 구현인 PLCnext용 mosquitto를 교차 컴파일하는 방법에 대한 Makers Blog 기사를 찾을 수 있습니다. 또는 PLCnext Store에서 준비된 MQTT 앱을 제공합니다.

요구사항

설정

다음 그림은 PLCnext 컨트롤에서 Kafka로 데이터를 수집하기 위해 구현하려는 설정의 개요를 보여줍니다. Kafka 버전(2)에 Confluent의 MQTT 프록시를 사용할 수 있지만 보다 일반적인 솔루션(1)에 중점을 둘 것입니다. 클라이언트가 메시지에 연결하고 메시지를 게시하는 MQTT 브로커와 브로커에서 주제를 구독하고 메시지를 처리하고 Kafka로 전달하는 커넥터로 구성됩니다.

커넥터 생성

이 자습서에서 커넥터는 MIT 라이선스(자세한 ​​라이선스 정보)에 따라 라이선스가 부여된 GitHub의 evokly/kafka-connect-mqtt 리포지토리를 기반으로 합니다. 먼저 저장소를 다운로드하고 추출합니다. 최신 저장소 버전이 2016년 말이므로 build.gradle을 업데이트합니다. 파일, 이전 종속성을 새 버전으로 교체:

ext { kafkaVersion = '2.6.0' }
...
dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.13' 
    compile "org.apache.kafka:connect-api:$kafkaVersion"
    compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5' 
    compile 'org.bouncycastle:bcprov-jdk15on:1.67' 
    compile 'org.bouncycastle:bcpkix-jdk15on:1.67' 
    compile 'org.bouncycastle:bcpg-jdk15on:1.67' 
    compile 'commons-io:commons-io:2.8.0' 
    compile 'org.slf4j:slf4j-api:1.7.30' 
    testCompile 'org.slf4j:slf4j-simple:1.7.30'
}

이 예에서는 일반 문자열 메시지를 Kafka에 보냅니다. 따라서 Java 클래스 DumbProcessor.java를 편집해야 합니다. /kafka-connect-mqtt-master/src/main/java/com/evokly/kafka/connect/mqtt 폴더에서 , 기본 메시지 프로세서:

@Override
public SourceRecord[] getRecords(String kafkaTopic) {

    return new SourceRecord[]{new SourceRecord(null, //sourcePartition
                   null,                //sourceOffset
                   kafkaTopic,          //topic
                   null,                //partition
                   null,                //keySchema
                   mTopic,              //key
                   null,                //valueSchema
                   mMessage.toString(), //value
                   new Long(123L))};    //long timestamp
}

그런 다음 종속성을 포함하는 Java 아카이브 파일(JAR)을 빌드합니다. ./gradlew clean jar . 출력 JAR kafka-connect-mqtt-1.1-SNAPSHOT.jar을 복사합니다. /kafka-connect-mqtt-master/build/libs 폴더에서 찾을 수 있습니다. libs으로 카프카 디렉토리.

Kafka의 libs 디렉토리에 있는 org.eclipse.paho.client.mqttv3-1.2.5.jar 아카이브 사본도 필요합니다. 여기에서 다운로드할 수 있습니다.

또한 mqtt.properties 커넥터에 대한 구성 파일을 만들어야 합니다. Kafka의 config에서 폴더. 파일 내용은 다음과 같습니다.

name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1

# converters for plain String messages without schemas
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false

kafka.topic=test_in                     # Kafka destination topic for the MQTT messages
mqtt.client_id=mqtt-kafka-123

mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60

mqtt.server_uris=tcp://172.17.0.1:1883  # address of the MQTT broker
mqtt.topic=test/#                       # MQTT topic where the messages should be collected

#if we want to use our own processor class
#message_processor_class=com.evokly.kafka.connect.mqtt.sample.OwnProcessor

로컬 테스트

이제 커넥터를 로컬에서 테스트할 수 있습니다. Kafka의 디렉토리로 이동하여 ZooKeeper 및 Broker 인스턴스를 시작합니다.

# start ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

# start Kafka:
bin/kafka-server-start.sh config/server.properties

# start an MQTT-Broker (here a mosquitto docker container)
sudo docker run -d --name mosquitto -p 1883:1883 eclipse-mosquitto 

# start the MQTT-Kafka connector
bin/connect-standalone.sh config/connect-standalone.properties config/mqtt.properties

# start a Kafka console consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_in --from-beginning --property print.value=true --property print.timestamp=true

# publish an MQTT message
mosquitto_pub -h 172.17.0.1 -p 1883 -t test/1 -m test123

메시지가 콘솔 소비자에 표시됩니다.


산업기술

  1. 모터 제어 회로
  2. 제어 회로
  3. 메시징을 위해 Apache Kafka를 지원하는 Eclipse Hono
  4. 원격 생산 제어의 5가지 장점
  5. 비아의 임피던스 제어 및 PCB 설계에서 신호 무결성에 미치는 영향
  6. SNMP를 통한 PLCnext Control 장치 관리
  7. PLCnext의 클러스터 관리?
  8. PLCnext Tableau 대시보드
  9. PLCnext Power BI 보고서
  10. PLCnext Control의 Java 애플리케이션