AWS SWF (Simple Workflow Service)のSplit Mergeサンプル(Java)

| コメントをどうぞ

前記事 AWS SWF (Simple Workflow Service)のBookingサンプル(Java) に続いて、Split Mergeサンプルを取り上げます。このサンプルでは1つのアクティビティを非同期に複数回呼び出してチャンクごとの合計値を計算し、最後に平均値を表示します。

事前準備とビルド

Helloworldと同じです。AWS SWF (Simple Workflow Service)を使ってみる(Java) の事前準備とビルドを参照してください。
このサンプルではS3のファイルにアクセスします。該当のファイルは split-merge-sample/input.txt でPublicにアクセスできます。

ActivityHost

aws-java-sdk-1.8.9.1\samples\AwsFlowFrameworkからコマンドプロンプトを開いて、以下のコマンドを実行すると起動されます。

ant -f build.xml -Dmain-class="com.amazonaws.services.simpleworkflow.flow.examples.splitmerge.ActivityHost" run

ソースを見ると、AverageCalculatorActivitiesImplインスタンスをアクティビティに登録しています。
このインスタンスのコンストラクタには、AWS S3の設定情報を渡しており、S3へアクセスできるようになります。
メソッドが3つあり、AWS管理コンソールで確認すると3つのアクティビティ(AverageCalculatorActivities.*, computeDataSize)が登録されています。

WorkflowHost

aws-java-sdk-1.8.9.1\samples\AwsFlowFrameworkからコマンドプロンプトを開いて、以下のコマンドを実行すると起動されます。

ant -f build.xml -Dmain-class="com.amazonaws.services.simpleworkflow.flow.examples.splitmerge.WorkflowHost" run 

ソースを見ると、AverageCalculatorWorkflowImplクラスをワークフローに登録しています。
ここではPartitionedAverageCalculatorに処理を委譲し、処理結果を表示しています。

PartitionedAverageCalculator calculator = new PartitionedAverageCalculatorImpl(numberOfWorkers, bucketName);
Promise<Double> result = calculator.computeAverage(fileName);        
calculator.reportResult(result);        

AWS管理コンソールで確認するとSplitMergeWorkflowExampleが登録されています。

PartitionedAverageCalculator

指定されたファイルを分割して、分割されたチャンクごとに非同期にワークフローを呼び出して合計を計算します。

    @Asynchronous
    private Promise<Double> computeAverageDistributed(String inputFile, Promise<Integer> dataSize) {
        int chunkSize = dataSize.get() / numberOfWorkers;

        // Create an array list to hold the result returned by each worker
        List<Promise<Integer>> results = new ArrayList<Promise<Integer>>();
        for (int chunkNumber = 0; chunkNumber < numberOfWorkers; chunkNumber++) {
            // Splitting computation for each chunk as separate activity
            results.add(client.computeSumForChunk(bucketName, inputFile, chunkNumber, chunkSize));
        }
        // Merge phase
        return mergeSumAndComputeAverage(results, dataSize.get());
    }

WorkflowExecutionStarter

aws-java-sdk-1.8.9.1\samples\AwsFlowFrameworkからコマンドプロンプトを開いて、以下のコマンドを実行すると起動されます。

ant -f build.xml -Dmain-class="com.amazonaws.services.simpleworkflow.flow.examples.splitmerge.WorkflowExecutionStarter" run

実行するとActivityHostのコンソールにメッセージが表示されます。非同期にチャンクごとの合計を計算していることが確認できます。

Sum from '41' to '50' is: '455'
Sum from '11' to '20' is: '155'
Sum from '31' to '40' is: '355'
Sum from '1' to '10' is: '55'
Sum from '21' to '30' is: '255'
Average is:  25.50.

ソースを見ると、クライアントを呼び出しています。

AverageCalculatorWorkflowClientExternalFactory clientFactory = new AverageCalculatorWorkflowClientExternalFactoryImpl(swfService, domain);
AverageCalculatorWorkflowClientExternal workflow = clientFactory.getClient();
workflow.average(bucketName, fileName, numberOfWorkers);

AWS管理コンソールのWorkflow Executionsの一覧から実行した履歴を選択し、Activitiesタブを選択すると3つのアクティビティの内AverageCalculatorActivities.computeSumForChunkが5回呼び出された履歴が確認できます。
aws-swf-split-merge1

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です

次のHTML タグと属性が使えます: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>