著者:杉本 拓
「Red Hat Integration」はアプリやデータの連携を実現するための、インテグレーションパターン、API 連携、API管理とセキュリティ、データ変換、リアルタイムメッセージング、データストリーミングなどを提供するオープンソース製品です。同製品には多くの機能が含まれていますが、本連載ではその概要と一部の機能を紹介します。
シェルスクリプトマガジン Vol.67は以下のリンク先でご購入できます。![]()
![]()
図18 二つのdependencyを追加する
<dependency>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-jackson2-provider</artifactId>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-utils-serde</artifactId>
<version>1.2.1.Final</version>
</dependency>
図19 「AvroRegistryExample.java」ファイルの内容
package com.redhat.kafka.registry;
import java.io.File;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import javax.enterprise.context.ApplicationScoped;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.reactivex.Flowable;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
@ApplicationScoped
public class AvroRegistryExample {
private Random random = new Random();
private String[] symbols = new String[] { "RHT", "IBM", "MSFT", "AMZN" };
@Outgoing("price-out")
public Flowable<KafkaRecord<String, Record>> generate() throws IOException {
Schema schema = new Schema.Parser().parse(
new File(getClass().getClassLoader().getResource("price-schema.avsc").getFile())
);
return Flowable.interval(1000, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.map(tick -> {
Record record = new GenericData.Record(schema);
record.put("symbol", symbols[random.nextInt(4)]);
record.put("price", String.format("%.2f", random.nextDouble() * 100));
return KafkaRecord.of(record.get("symbol").toString(), record);
});
}
}
図20 「price-schema.avsc」ファイルの内容
{
"type": "record",
"name": "price",
"namespace": "com.redhat",
"fields": [
{
"name": "symbol",
"type": "string"
},
{
"name": "price",
"type": "string"
}
]
}
図21 登録されたAvroのスキーマ
{
"createdOn": 1575919739708,
"modifiedOn": 1575919739708,
"id": "prices-value",
"version": 1,
"type": "AVRO",
"globalId": 4
}
図22 プロパティファイル
# Configuration file
kafka.bootstrap.servers=localhost:9092
mp.messaging.outgoing.price-out.connector=smallrye-kafka
mp.messaging.outgoing.price-out.client.id=price-producer
mp.messaging.outgoing.price-out.topic=prices
mp.messaging.outgoing.price-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.price-out.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer
mp.messaging.outgoing.price-out.apicurio.registry.url=http://localhost:8081/api
mp.messaging.outgoing.price-out.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.TopicIdStrategy