シェルスクリプトマガジン

レッドハットのプロダクト(Vol.67記載)

著者:杉本 拓

「Red Hat Integration」はアプリやデータの連携を実現するための、インテグレーションパターン、API 連携、API管理とセキュリティ、データ変換、リアルタイムメッセージング、データストリーミングなどを提供するオープンソース製品です。同製品には多くの機能が含まれていますが、本連載ではその概要と一部の機能を紹介します。

シェルスクリプトマガジン Vol.67は以下のリンク先でご購入できます。

図18 二つのdependencyを追加する

   <dependency>
     <groupId>org.jboss.resteasy</groupId>
     <artifactId>resteasy-jackson2-provider</artifactId>
   </dependency>
   <dependency>
     <groupId>io.apicurio</groupId>
     <artifactId>apicurio-registry-utils-serde</artifactId>
     <version>1.2.1.Final</version>
   </dependency>

図19 「AvroRegistryExample.java」ファイルの内容

package com.redhat.kafka.registry;
 
import java.io.File;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
 
import javax.enterprise.context.ApplicationScoped;
 
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
 
import io.reactivex.Flowable;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
 
@ApplicationScoped
public class AvroRegistryExample {
 
   private Random random = new Random();
   private String[] symbols = new String[] { "RHT", "IBM", "MSFT", "AMZN" };
 
   @Outgoing("price-out")
   public Flowable<KafkaRecord<String, Record>> generate() throws IOException {
       Schema schema = new Schema.Parser().parse(
           new File(getClass().getClassLoader().getResource("price-schema.avsc").getFile())
       );
       return Flowable.interval(1000, TimeUnit.MILLISECONDS)
           .onBackpressureDrop()
           .map(tick -> {
               Record record = new GenericData.Record(schema);
               record.put("symbol", symbols[random.nextInt(4)]);
               record.put("price", String.format("%.2f", random.nextDouble() * 100));
               return KafkaRecord.of(record.get("symbol").toString(), record);
           });
   }
}

図20 「price-schema.avsc」ファイルの内容

{
   "type": "record",
   "name": "price",
   "namespace": "com.redhat",
   "fields": [
       {
           "name": "symbol",
           "type": "string"
       },
       {
           "name": "price",
           "type": "string"
       }
   ]
}

図21 登録されたAvroのスキーマ

{
  "createdOn": 1575919739708,
  "modifiedOn": 1575919739708,
  "id": "prices-value",
  "version": 1,
  "type": "AVRO",
  "globalId": 4
}

図22 プロパティファイル

# Configuration file
kafka.bootstrap.servers=localhost:9092
 
mp.messaging.outgoing.price-out.connector=smallrye-kafka
mp.messaging.outgoing.price-out.client.id=price-producer
mp.messaging.outgoing.price-out.topic=prices
mp.messaging.outgoing.price-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.price-out.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer
 
mp.messaging.outgoing.price-out.apicurio.registry.url=http://localhost:8081/api
mp.messaging.outgoing.price-out.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.TopicIdStrategy