複数のサブシステムにデータの同期を行いたい場合に、どのようなアーキテクチャにしますか?
重要なこととして、トランザクションが利用できないためにSQLの発行元は、データの一貫性が保てるように配慮する必要があります。
例えば、SQLの実行をLambdaで行った場合、データベースAとデータベースBにそれぞれ書き込みを行う必要がありますが、複数のレコードを更新する場合にはアプリケーションロジックが複雑になりがちです。
そこで、Trigger an AWS Lambda function from Amazon RDS for MySQL or Amazon RDS for MariaDB using audit logs and Amazon CloudWatchのアーキテクチャを紹介します。
この方法であれば、監査ログを使って実際に書き込みされたことをトリガーにしてLambdaが実行されるので、実装がシンプルで、データの同期の実装を後付けで行うこともできます。上記の例では、RDS for MySQL(もしくはMariaDB)になっていますが、冒頭の説明にもあるとおりAurora MySQLでも可能です。
ただこのアーキテクチャの場合には、CloudWatch Logsのサブスクリプションフィルターを使ってLambdaを実行するため、Lambdaの実行ができない場合が考えられます。Lambda関数の同時実行数の上限に達するなどの問題により実行エラー発生時のリトライを考慮したい場合には、CloudWatch LogsのサブスクリプションフィルターからLambdaを起動するのではなく、Kinesis Data Streamsに書き込み、イベントソースマッピングにて対応することができます。
複数のレコードをバッチにまとめてLambda関数で処理することで同時実行数を抑えたり、Lambda関数実行エラー時も自動でリトライ可能となり、最小限の呼び出しで実行できるようになります。
ここで注意したいのが、Aurora MySQL の監査ログの場合、パラメータグループ(character_set_database)で指定された文字コードではなく、UTF-8形式となっていて、CloudWatch LogsからKinesis DataStreamへデータが書き込まれる際に、gzip 形式で圧縮されていることです。
そのため、CloudWatch Logsでは正しく表示されますが、Kinesis Data Streamのデータビュアーでは文字化けして表示されます。また、Lambdaのテストテンプレートにあります kinesis-get-recordsは、「Hello, this is a test 123.」のデータがBase64でエンコードされ、gzip圧縮されていないため、テストする際には以下の方法で事前にデータを準備しておく必要があります。
echo "hoge" > ~/work/tmp/hoge
gzip ~/work/tmp/hoge
cat ~/work/tmp/hoge.gz | base64
ちなみにBase64をデコードする方法は以下の通りです。
echo -n "H4sICAgtzGYAA2hvZ2UAy8hPT+UCAJ2H4rkFAAAA" | base64 -d | zcat
Kinesis DataStream のデータを参照するためのコードは他言語では参考となるものがあるものの、C#のコードがほとんど見あたりませんでした。AWSの公式ドキュメントを参考にして、試行錯誤しながら以下の方法で実装してみました。
using System.Text;
using System.Text.RegularExpressions;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using System.IO;
using System.IO.Compression;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace SyncStatus;
public class Function
{
Regex reg = new Regex(",(?=(?:[^\']*\'[^\']*\')*[^\']*$)");
public void FunctionHandler(KinesisEvent kinesisEvent, ILambdaContext context)
{
context.Logger.LogInformation($"Beginning to process {kinesisEvent.Records.Count} records...");
foreach (var record in kinesisEvent.Records)
{
context.Logger.LogInformation($"Event ID: {record.EventId}");
context.Logger.LogInformation($"Event Name: {record.EventName}");
try
{
string recordData = GetRecordContents(record.Kinesis);
var messages = JObject.Parse(recordData).GetValue("logEvents");
context.Logger.LogInformation($"Record Data(UTF-8):");
foreach (var item in messages)
{
var log = item["message"].Value<string>();
context.Logger.LogInformation((reg.Split(log))[8].Trim('\'').Replace("\\'", "'"));
}
}
catch (Exception e)
{
context.Logger.LogError(e.Message);
}
}
context.Logger.LogInformation("Stream processing complete.");
}
private string GetRecordContents(KinesisEvent.Record streamRecord)
{
using (var gZipStream = new GZipStream(streamRecord.Data, CompressionMode.Decompress))
using (var memoryStreamOutput = new MemoryStream())
{
gZipStream.CopyTo(memoryStreamOutput);
var outputBytes = memoryStreamOutput.ToArray();
string decompressed = Encoding.UTF8.GetString(outputBytes);
return decompressed;
}
}
}
是非、Kinesis DataStreamを使って楽しいピタゴラスイッチライフを!