著者:杉本 拓
「Red Hat Integration」はアプリやデータの連携を実現するための、インテグレーションパターン、API 連携、API管理とセキュリティ、データ変換、リアルタイムメッセージング、データストリーミングなどを提供するオープンソース製品です。同製品には多くの機能が含まれていますが、本連載ではその概要と一部の機能を紹介します。
シェルスクリプトマガジン Vol.67は以下のリンク先でご購入できます。
図18 二つのdependencyを追加する
1 2 3 4 5 6 7 8 9 |
<dependency> <groupId>org.jboss.resteasy</groupId> <artifactId>resteasy-jackson2-provider</artifactId> </dependency> <dependency> <groupId>io.apicurio</groupId> <artifactId>apicurio-registry-utils-serde</artifactId> <version>1.2.1.Final</version> </dependency> |
図19 「AvroRegistryExample.java」ファイルの内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
package com.redhat.kafka.registry; import java.io.File; import java.io.IOException; import java.util.Random; import java.util.concurrent.TimeUnit; import javax.enterprise.context.ApplicationScoped; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; import org.eclipse.microprofile.reactive.messaging.Outgoing; import io.reactivex.Flowable; import io.smallrye.reactive.messaging.kafka.KafkaRecord; @ApplicationScoped public class AvroRegistryExample { private Random random = new Random(); private String[] symbols = new String[] { "RHT", "IBM", "MSFT", "AMZN" }; @Outgoing("price-out") public Flowable<KafkaRecord<String, Record>> generate() throws IOException { Schema schema = new Schema.Parser().parse( new File(getClass().getClassLoader().getResource("price-schema.avsc").getFile()) ); return Flowable.interval(1000, TimeUnit.MILLISECONDS) .onBackpressureDrop() .map(tick -> { Record record = new GenericData.Record(schema); record.put("symbol", symbols[random.nextInt(4)]); record.put("price", String.format("%.2f", random.nextDouble() * 100)); return KafkaRecord.of(record.get("symbol").toString(), record); }); } } |
図20 「price-schema.avsc」ファイルの内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
{ "type": "record", "name": "price", "namespace": "com.redhat", "fields": [ { "name": "symbol", "type": "string" }, { "name": "price", "type": "string" } ] } |
図21 登録されたAvroのスキーマ
1 2 3 4 5 6 7 8 |
{ "createdOn": 1575919739708, "modifiedOn": 1575919739708, "id": "prices-value", "version": 1, "type": "AVRO", "globalId": 4 } |
図22 プロパティファイル
1 2 3 4 5 6 7 8 9 10 11 |
# Configuration file kafka.bootstrap.servers=localhost:9092 mp.messaging.outgoing.price-out.connector=smallrye-kafka mp.messaging.outgoing.price-out.client.id=price-producer mp.messaging.outgoing.price-out.topic=prices mp.messaging.outgoing.price-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer mp.messaging.outgoing.price-out.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer mp.messaging.outgoing.price-out.apicurio.registry.url=http://localhost:8081/api mp.messaging.outgoing.price-out.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.TopicIdStrategy |