Пошаговое руководство по использованию DynamoDB Stream с использованием AWS Lambda на C #

Программирование, управляемое событиями, - это модное направление в мире программного обеспечения сегодня. О построении системы со слабосвязанными, независимо развертываемыми и легко масштабируемыми компонентами можно многое сказать. Бессерверные инструменты можно использовать для создания некоторых из этих компонентов; один AWS, что часто означает использование DynamoDB и Lambda. В этой статье мы собираемся создать небольшую систему, управляемую событиями, в которой DynamoDB является нашим источником событий, а функции Lambda вызываются в ответ на эти события. Для этого мы воспользуемся функцией DynamoDB Streams.

После включения DynamoDB Stream для таблицы все изменения в этой таблице записываются и помещаются в поток по порядку. AWS не определяет внутреннюю структуру потока, но они очень похожи на потоки Kinesis (и могут использовать их скрытно). Существует несколько способов использования потока, включая использование клиентской библиотеки Kinesis (предоставляемый AWS SDK ), но один из самых простых способов - настроить поток как триггер для лямбда-функции.

Теперь давайте рассмотрим процесс включения DynamoDB Stream, написания короткой лямбда-функции для получения событий из потока и настройки DynamoDB Stream в качестве триггера для лямбда-функции. Мы рассмотрим его шаг за шагом, но если вы хотите пропустить его до конца и проверить исходный код и Terraform, перейдите прямо к моему репозиторию GitHub. Если вам нужно освежить в памяти основы DynamoDB, прочтите мою статью Начало работы с DynamoDB.

Предпосылки

Чтобы точно следовать этому руководству, вам потребуются Visual Studio 2019 и AWS Toolkit. Я использую эту настройку здесь из-за тесной интеграции с Lambda; Публикация кода в AWS осуществляется одним щелчком мыши.

Я включил файл Terraform для создания таблицы DynamoDB (включая ее индексы и поток). Если вы раньше не использовали Terraform, это фантастический инструмент, который позволяет вам писать свою инфраструктуру в виде кода. Кроме того, вы можете создать и настроить таблицу DynamoDB вручную с помощью Консоли управления AWS.

Создайте таблицу DynamoDB и поток

Начнем с просмотра файла Terraform main.tf. В файле есть только один ресурс (объект инфраструктуры) - наша таблица DynamoDB. В этом ресурсе определяется каждый из атрибутов и индексов таблицы (обзор глобальных и вторичных индексов в предыдущей статье).

Для включения потока требуются два аргумента: stream_enabled (логическое значение)
и stream_view_type (перечисление). Steam_view_type позволяет вам выбрать, сколько данных будет помещено в поток. Вы можете выбрать один из четырех вариантов:

  • KEYS_ONLY - только раздел и ключ сортировки для измененной записи
  • NEW_IMAGE - все атрибуты обновленной записи
  • OLD_IMAGE - все атрибуты записи до модификации
  • NEW_AND_OLD_IMAGES - все атрибуты до и после модификации

В нашем примере мы выберем NEW_AND_OLD_IMAGES, чтобы иметь как можно больше информации. Как правило, вам нужен вариант, который соответствует вашим потребностям при использовании минимального объема данных.

Если вы следуете примеру и хотите использовать Terraform для развертывания этой базы данных в AWS, единственное необходимое предварительное условие - наличие общих учетных данных AWS, хранящихся на вашем компьютере (если вы когда-либо использовали AWS CLI, вы все готово.) Скачайте Terraform и добавьте в свой путь. Отредактируйте файл main.tf и измените profile на имя вашего собственного профиля AWS. Затем откройте командную строку в каталоге, содержащем файл .tf, и выполните следующие две команды:

Terraform выполнит все API-интерфейсы AWS, необходимые для создания вашей инфраструктуры. Если вы внесете изменения в свой файл терраформа, вы можете снова запустить «терраформ применить», чтобы выполнить обновления. Когда вы закончите использовать его, выполнение «terraform destroy» удалит все ваши изменения из среды AWS.

Создайте лямбду для использования потока

Имея нашу таблицу на месте, давайте напишем код для лямбда-функции, которая будет ее использовать. Откройте Visual Studio 201 и создайте новый проект AWS Lambda (.NET Core C #); вы можете использовать этот шаблон после установки AWS Toolkit для Visual Studio. Когда будет предложено выбрать Blueprint, выберите «пустую функцию», поскольку мы хотим начать с нуля.

Взгляните на зависимости NuGet для нашего проекта. В шаблон уже добавлены пакеты, необходимые для базовой функции Lambda.

Нам нужно будет добавить еще одну зависимость. В окне «Управление пакетами Nuget» найдите и установите Amazon.Lambda.DynamoDBEvents. Исходный код нашей лямбды приведен ниже; взгляните на это, и тогда мы поговорим об этом:

Вверху мы добавили директиву using для пространства имен Amazon.Lambda.DynamoDBEvents. Затем мы переходим к функции Lambda, которую предоставляет шаблон (FunctionHandler), которая принимает в качестве параметров строку и ILambdaContext. ILambdaContext содержит полезную информацию о функции Lambda, такую ​​как ограничение памяти, оставшееся время выполнения и доступ к ведению журнала, поэтому мы ее сохраним. Мы заменили строковый параметр на DynamoDBEvent; этот объект содержит список объектов DynamodbStreamRecord. Каждая запись DynamodbStreamRecord представляет собой описание отдельного изменения данных в таблице DynamoDB.

Когда мы настраиваем Lambda для прослушивания DynamoDB Stream, мы выбираем максимальный размер пакета и временное окно, в котором этот пакет собирается. Из-за этого наша функция должна иметь возможность обрабатывать более одного DynamodbStreamRecord при каждом вызове. Поэтому мы добавили цикл для обхода каждого элемента DynamoDBEvent и выполнения некоторых действий. В нашем примере действие заключается в использовании ILambdaContext.Logger для записи информации о событии в CloudWatch, но вы можете изменить внутреннюю структуру функции в соответствии с вашим вариантом использования.

Имея готовый код, мы можем опубликовать функцию в AWS, щелкнув проект правой кнопкой мыши в обозревателе решений и выбрав «Опубликовать в AWS Lambda». Выберите имя и регион AWS для новой функции Lambda.

На следующем экране будет запрошена дополнительная информация о лямбде. Единственное изменение, которое мы внесем в значения по умолчанию, - это выбор роли IAM; это определяет привилегии, которые наша Lambda должна будет взаимодействовать с другими сервисами AWS во время выполнения. Создайте новую роль на основе AWSLambdaDynamoDBEecutionRole - это позволяет просматривать и читать потоки DynamoDB, а также записывать в журналы CloudWatch. Затем загрузите свою функцию в облако.

Конфигурация Lambda - Консоль AWS

Для заключительных шагов перейдите в раздел Lambda консоли AWS и выберите свою функцию, которую мы только что опубликовали.

Мы собираемся добавить триггер для запуска нашей лямбда-функции всякий раз, когда записи добавляются в наш DynamoDB Stream. Выберите «добавить триггер» на экране конфигурации функции и выберите DynamodDB. Вам будет представлен экран ниже, на котором вы сможете составить подробную информацию о нашем источнике событий.

Убедитесь, что у вас выбрана нужная таблица DynamoDB, а затем просмотрите другие варианты. Размер пакета - это максимальное количество DynamodbStreamRecords, которое будет отправлено нашей функции за выполнение. Пакетное окно определяет частоту в секундах, с которой поток будет читать события из таблицы и вызывать нашу лямбда-функцию. В этом примере будет достаточно вариантов по умолчанию, но в реальном мире вам нужно адаптировать их к своему варианту использования. Более короткое пакетное окно имеет смысл для приложения, которое должно реагировать на обновления почти в реальном времени, в то время как более длинное окно (и больший размер пакета) может быть более экономичным в сценариях, в которых обработка одного элемента является быстрой операцией и можно делать паузу между пакетными процессами.

Проверить и подтвердить

Пришло время опробовать то, что мы создали. Перейдите к своей таблице DynamoDB в консоли AWS и начните вносить изменения. Добавьте несколько новых записей. Измените существующую запись. Как мы узнаем, что наш Stream и Lambda работают? Что ж, поскольку наша Lambda пишет в CloudWatch, мы должны проверить логи. Перейдите к CloudWatch в консоли AWS и выберите группы журналов на боковой панели.

Вы должны увидеть группу с тем же именем, что и ваша лямбда-функция. Откройте его, и вы найдете список потоков журналов. Один поток будет создан для каждого контекста выполнения вашей лямбда-функции. Для справки: при первом вызове вашей функции создается контекст выполнения (временная среда выполнения для вашей функции). Последовательные вызовы вашей функции будут попадать в один и тот же контекст до тех пор, пока не пройдет около десяти минут, когда ваша функция не выполнится. После этого периода восстановления повторный вызов вашей Lambda вызовет загрузку функции в новый контекст выполнения (это называется холодным запуском). Множественные одновременные вызовы вашей функции также могут привести к созданию нового контекста выполнения, потому что выполнение context может обрабатывать только один запрос за раз.

В любом случае вы должны увидеть хотя бы один поток журнала. Если нет, попробуйте обновить страницу; иногда журналам требуется время, чтобы распространиться. Если вам по-прежнему не повезло, проверьте триггер, настроенный для лямбда-выражения, и убедитесь, что он настроен на правильную таблицу DynamoDB. Кроме того, если размер вашего пакетного окна установлен на любое значение больше нуля, вам придется подождать, пока он не истечет, прежде чем Stream соберет все изменения таблицы. Когда вы щелкаете по потоку, вы должны увидеть журналы, подобные этому:

Вы также можете зайти в CloudWatch Metrics и отобразить дополнительную информацию о своей функции. Ниже вы найдете интересный график вызовов нашей функции; вы можете видеть, что мы перешли от нуля к единице в 2:25.

Резюме

Потоки позволяют реализовать интересные варианты использования, которые иначе сложно поддерживать в DynamoDB. Мы можем послать пользователю письмо по электронной почте, когда он достигнет своего текущего рекорда. Мы также можем агрегировать данные и записывать их в другую таблицу; возможно, мы будем отслеживать, сколько раз за определенный промежуток времени менялся рекорд игры. Потоки также позволяют репликацию данных; это может быть полезно в сценарии, когда мы хотим запускать аналитические запросы в базе данных, лучше подходящей для этого типа деятельности (например, SQL Server или Redshift). Потратьте несколько минут, чтобы опробовать их и посмотреть, подходят ли они для варианта использования в ваши будущие приложения.