AWS Kinesisを使ってみる(Java)(後編)

| コメントをどうぞ

前記事 AWS Kinesisを使ってみる(Java)(前編) に続いて、レコードを受信するKinesisアプリケーションを作ってみた。

事前準備

Kinesisアプリケーションのコンパイル&実行には、Kinesis Client Library が必要となります。Mavenレポジトリに amazon-kinesis-client-1.0.0.jar ファイルがあるので、これをダウンロードし、CLASSPATHに追加します。

%USER_HOME%.aws\credentials にaws_access_key_idとaws_secret_access_keyを設定します。

Kinesisアプリケーション

Kinesisアプリケーションを実装しました。

import java.io.IOException;
import java.net.InetAddress;
import java.util.UUID;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;

public final class SampleKinesisApplication {
    private static String applicationName = "SampleKinesisApplication";
    private static String streamName = "myFirstStream";
    private static String kinesisEndpoint = "https://kinesis.ap-northeast-1.amazonaws.com";
    // Initial position in the stream when the application starts up for the first time.
    // Position can be one of LATEST (most recent data) or TRIM_HORIZON (oldest available data)
    private static InitialPositionInStream initialPositionInStream = InitialPositionInStream.TRIM_HORIZON;
    private static KinesisClientLibConfiguration kinesisClientLibConfiguration;

    public static void main(String[] args) throws IOException {
        configure();
        System.out.println("Starting " + applicationName + " to process stream " + streamName);
        IRecordProcessorFactory recordProcessorFactory = new SampleRecordProcessorFactory();
        Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration);
        int exitCode = 0;
        try {
            worker.run();
        } catch (Throwable t) {
            t.printStackTrace();
            exitCode = 1;
        }
        System.exit(exitCode);
    }

    private static void configure() throws IOException {
        // ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
        java.security.Security.setProperty("networkaddress.cache.ttl" , "60");
        String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
        System.out.println("Using workerId: " + workerId);
        AWSCredentialsProvider credentialsProvider = new ProfileCredentialsProvider();
        kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
                applicationName, streamName, credentialsProvider, workerId)
                .withInitialPositionInStream(initialPositionInStream)
                .withKinesisEndpoint(kinesisEndpoint);
    }

    private static class SampleRecordProcessorFactory implements IRecordProcessorFactory {
        @Override
        public IRecordProcessor createProcessor() {
            return new SampleRecordProcessor();
        }
    }
}

SampleRecordProcessorFactoryからインスタンス生成されるレコードプロセッサーを実装します。

import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.List;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;

public class SampleRecordProcessor implements IRecordProcessor {
    private String kinesisShardId;
    // Backoff and retry settings
    private static final long BACKOFF_TIME_IN_MILLIS = 3000L;
    private static final int NUM_RETRIES = 10;
    // Checkpoint about once a minute
    private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
    private long nextCheckpointTimeInMillis;
    private final CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();

    @Override
    public void initialize(String shardId) {
        System.out.println("Initializing record processor for shard: " + shardId);
        this.kinesisShardId = shardId;
    }

    @Override
    public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
        System.out.println("Processing " + records.size() + " records from " + kinesisShardId);
        processRecordsWithRetries(records);
        if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
            checkpoint(checkpointer);
            nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
        }
    }

    private void processRecordsWithRetries(List<Record> records) {
        for (Record record : records) {
            boolean processedSuccessfully = false;
            String data = null;
            for (int i = 0; i < NUM_RETRIES; i++) {
                try {
                    data = decoder.decode(record.getData()).toString();
                    System.out.println(record.getSequenceNumber() + ", " + record.getPartitionKey() + ", " + data);
                    processedSuccessfully = true;
                    break;
                } catch (CharacterCodingException e) {
                    e.printStackTrace();
                    break;
                } catch (Throwable t) {
                    t.printStackTrace();
                }
                try {
                    Thread.sleep(BACKOFF_TIME_IN_MILLIS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (!processedSuccessfully) {
                System.err.println("Couldn't process record " + record + ". Skipping the record.");
            }
        }
    }

    @Override
    public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
        System.out.println("Shutting down record processor for shard: " + kinesisShardId);
        if (reason == ShutdownReason.TERMINATE) {
            checkpoint(checkpointer);
        }
    }

    private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
        System.out.println("Checkpointing shard " + kinesisShardId);
        for (int i = 0; i < NUM_RETRIES; i++) {
            try {
                checkpointer.checkpoint();
                break;
            } catch (ShutdownException se) {
                se.printStackTrace();
                break;
            } catch (ThrottlingException e) {
                if (i >= (NUM_RETRIES - 1)) {
                    System.err.println("Checkpoint failed after " + (i + 1) + "attempts.");
                    break;
                } else {
                    System.out.println("Transient issue when checkpointing - attempt " + (i + 1) + " of " + NUM_RETRIES);
                }
            } catch (InvalidStateException e) {
                System.err.println("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.");
                break;
            }
            try {
                Thread.sleep(BACKOFF_TIME_IN_MILLIS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Kinesisアプリケーションの実行結果はこちら

Using workerId: xxx:26e53d79-b0ab-4357-a762-36b3f4702d53
Starting SampleKinesisApplication to process stream myFirstStream
Processing 10 records from shardId-000000000000
49543106992521154367174367464671098504452954621114056706, partitionKey-0, testData-0
49543106992521154367174367464672307430272569250288762882, partitionKey-1, testData-1
49543106992521154367174367464673516356092183879463469058, partitionKey-2, testData-2
49543106992521154367174367464674725281911798508638175234, partitionKey-3, testData-3
49543106992521154367174367464675934207731413137812881410, partitionKey-4, testData-4
49543106992521154367174367464677143133551027766987587586, partitionKey-5, testData-5
49543106992521154367174367464678352059370642396162293762, partitionKey-6, testData-6
49543106992521154367174367464679560985190257025336999938, partitionKey-7, testData-7
49543106992521154367174367464680769911009871654511706114, partitionKey-8, testData-8
49543106992521154367174367464681978836829486352405889026, partitionKey-9, testData-9

前回の PutRecord を実行する度にレコードを受信することを確認できます。

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です

次のHTML タグと属性が使えます: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>