Creating tasks (to-dos) in Habitica is not the most effective and pleasant thing using the UI (although tools like Trello or Jira are far from better). Compared to editing a simple and single text file, whether formatted as Markdown or not, is much easier and gives a better overview (at least if the list is just a simple title without a long description). In previous post I demonstrated you a way to measure overall productivity or performance in Habitica. However, I myself use it only for daily and repeating tasks - I don't have to use the UI to define it every time. For one-time to-dos the situation is very different. I want simplicity and efficiency. And what is more efficient than simple text? I will use Habitica's API powers again to create a solution that will transform a text file to a list of tasks in Habitica including performing edits. I will also show you how to solve the problem of low API rate limits.
The completed project is tagged v3
and built on top of previous posts.
- Repo: https://github.com/ppabis/habitica-item-seller/tree/v3
- Part 1: Use AWS Serverless to sell items in Habitica
- Part 2: Track your performance using Habitica, Timestream and Grafana
Processing a list of tasks
The lists of tasks will be uploaded to an S3 bucket. An event notification in this bucket will trigger a Lambda function that will process this file and create and update tasks. The format for the tasks will be the following:
ID due date difficulty+attribute - task description
0001. 15/07/2024 TP - Wash the dishes
0002. HI - Create a new blog post
ID will be used to track the tasks in DynamoDB at a later stage. You can just rely on the line number but I want to ensure consistency. This also allows to split tasks between multiple files. Due date can be a date in DD/MM/YYYY
format or empty string in case there's no due date. Difficulties are the following: Trivial, Easy, Medium and Hard. They will be mapped to appropriate priority
values in Habitica. Attributes are not visible in Habitica UI - it determines which skill of the player will multiply task's value on completion: Strength, Intelligence, Perception and Constitution. Any line that will not follow this format will be just discarded - we will use regular expressions to verify it. In the above example, the first line will be ignored. This allows us to keep comments or more details for each task and they won't be processed.
Let's define the first function with which we will be able to process the tasks into Python objects. It will split the input data into lines and then parse each line to produce an object with all the task parameters. We will also wrap each line processing call in a try-catch block so that in case the date is formatted badly, we will just print an error and continue.
import re
from datetime import datetime
DIFFICULTIES = { 'T': '0.1', 'E': '1', 'M': '1.5', 'H': '2' }
ATTRIBUTES = { 'S': 'str', 'I': 'int', 'P': 'per', 'C': 'con' }
def line_to_task(line: str) -> dict | None:
# (Task ID). (Date?) (Difficulty) (Attribute) - (Title), feel free to adapt to your needs
r = re.match('^(\\d+).\\s+([0-9/]*)\\s*([TEMHtemh])([SIPCsipc])\\s+-\\s+(.*)$', line)
if r:
date = None if r.group(2) == '' else datetime.strptime(r.group(2), '%d/%m/%Y')
difficulty = DIFFICULTIES[r.group(3).upper()]
attribute = ATTRIBUTES[r.group(4).upper()]
return {
'id': r.group(1),
'date': date,
'difficulty': difficulty,
'attribute': attribute,
'title': r.group(5),
}
return None
def parse_task_list(task_list: str) -> list[dict]:
tasks = []
for line in task_list.split('\n'):
try:
task = line_to_task(line.strip())
if task:
tasks.append(task)
except Exception as e:
print(f"Error processing line '{line}': {e}")
return tasks
I tested the function with the following task list and received a correct output when parsing all the values. I formatted the output to be somewhat of a readable format to see if every value has its place.
Input
This is an unrelated lien taht should be ignored
0001. 01/08/2024 MI - Create a list of tasks
0002. 02/08/2024 ES - Create another list of tasks
0003. TS - A task with no due date!
This is not a task
0004. This is also not a task
0005. 03/10/2024 - Also this is also not a task
0006. TC - test 12345 this should be a task - with a hyphen -- extra - hyphens
0007. 05/08/2024 HP - 😂 emojis 🏆
0010. 66/12/3033 EC - this task should be just warned and not crash but is wrong
0234. HC - test 0234 this should be a task
Output
Error processing line '0010. 66/12/3033 EC - this task should be just warned and not crash but is wrong': time data '66/12/3033' does not match format '%d/%m/%Y'
0001: difficulty=1.5 attribute=int date=2024-08-01 00:00:00
Create a list of tasks
0002: difficulty=1 attribute=str date=2024-08-02 00:00:00
Create another list of tasks
0003: difficulty=0.1 attribute=str date=None
A task with no due date!
0006: difficulty=0.1 attribute=con date=None
test 12345 this should be a task - with a hyphen -- extra - hyphens
0007: difficulty=2 attribute=per date=2024-08-05 00:00:00
😂 emojis 🏆
0234: difficulty=2 attribute=con date=None
test 0234 this should be a task
Storing and updating tasks
Now we have to decide which tasks need to be created and which ones need to be updated. We need to map our task IDs to Habitica's UUIDs. We will use DynamoDB for that. This will be also more efficient to compare contents of the task with an entry in DynamoDB in contrast to querying Habitica API for each task. However, to keep things even simpler, I will deliberately skip handling deletion of the tasks - for this we would need to either rely on file differences (lines removed) or scan entire table and compare to what we have loaded from the file. (Alternatively, my idea was a -
in front of the ID is to remove task in one file upload and then task can be safely deleted from the file for next uploads, but this post was already very long 😅). Our primary key will be the task ID. As it always increases, it shouldn't pose any problems. The keys don't have to be increased by one - you can use 10000
, 20000
and so on to split projects or just keep some order/grouping.
I will first define the simplest function which is creating a new task. It will return a formatted dict
that can be directly submitted to DynamoDB. The date
field will always be that will be either an empty string or in an ISO standard format such as 2024-07-08T18:33:56
. This will make things simpler rather than checking if the date
exists in the record.
def create_task(task: dict) -> dict:
return {
'id': task['id'],
'title': task['title'],
'date': task['date'].isoformat() if task['date'] else "",
'difficulty': task['difficulty'],
'attribute': task['attribute']
}
The next function will be used to update the task found already in DynamoDB table. It will compare each field separately and return the changed object if some changes were made to it or None
when we shouldn't update the row.
def compare_and_update(task: dict, item: dict) -> dict | None:
dirty = False
task['date'] = task['date'].isoformat() if task['date'] else ""
if task['title'] != item['title']:
item['title'] = task['title']
dirty = True
if task['date'] != item['date']:
item['date'] = task['date']
dirty = True
# ... continues for other fields...
return item if dirty else None
As the last point, we will combine the logic to use both of the functions to either create, update or skip the task. However, here comes a twist - Habitica allows for batch creating new tasks but can only update existing ones one by one. To save on time and executions, we will create a list of tasks to be created and do this immediately in this function. Tasks that need to be updated will be updated in DynamoDB and their IDs will be passed further to the Step Function. Why? The problem is that Habitica API is very strict on the amount of calls we can do (30 per minute). We can use simple sleep
in the Lambda but this will incur unnecessary costs. Step Function has a Wait
block that can save us a bit of money. The process will look something like on the diagram below.
How to determine if a task is new or need updating? We don't have to reach out to the API. Tasks that are known need to have UUID mapping it to Habitica. So we just need to check this one attribute. If it is there and change was detected with above function, we update the row in DynamoDB and add ID to the list. The next function will simply load it again from DynamoDB and send a request to Habitica.
def store_update_tasks(tasks: list[dict]) -> list[str]:
updated_ids, new_tasks = [], []
for task in tasks:
# Update a task in DynamoDB if the input is different than what is in
# the database. If the record in the database does not have UUID, it
# means that it needs to be created in Habitica.
response = ddb.get_item(Key={'id': task['id']})
if 'Item' in response:
updated = compare_and_update(task, response['Item'])
if updated:
ddb.put_item(Item=updated)
if 'habitica_uuid' in updated and updated['habitica_uuid']:
# This item exists in Habitica
updated_ids.append(updated['id'])
else:
# This item was existing but wasn't submitted to Habitica yet
new_tasks.append(updated)
else:
print(f"Task {task['id']} is up to date.")
# Task not found so create a new one
else:
ddb.put_item(Item=create_task(task))
new_tasks.append(task)
id_uuids = batch_create_tasks(new_tasks) # Dummy function for now
# Update rows in DynamoDB to hold UUIDs of the recently created tasks
for local_id, uuid in ids_uuids:
ddb.update_item(
Key={'id': task_id},
UpdateExpression='SET habitica_uuid = :uuid',
ExpressionAttributeValues={':uuid': uuid}
)
return updated_ids
def batch_create_tasks(tasks) -> list[(str, str)]:
# Dummy function for now
return [(task['id'], f"123456-{task['id']}") for task in tasks]
As the last step, we have to create Lambda handler which will be the target for S3 event on object upload or update - so in case we create a new task list or update it, the Lambda will be triggered. If you plan on using multiple files in S3, this function should be fine with processing it. However, you need to keep the IDs in all lists unique. We will process all the objects that were sent by the event and do it safely in try-catch block so that one bad file won't crash the entire process. We will iterate through all records in the event in case the function was triggered for multiple uploads.
# imports, clients...
def process_file(record) -> list[str]:
obj = s3.get_object( Bucket=record['s3']['bucket']['name'], Key=record['s3']['object']['key'] )
tasks = parse_task_list(obj['Body'].read().decode('utf-8'))
ids = store_update_tasks(tasks)
print(f"Tasks to update: {', '.join(ids)}")
return ids
def lambda_handler(event, context):
ids = []
for record in event['Records']:
if record['eventName'].startswith('ObjectCreated'):
try:
_ids = process_file(record)
ids.extend(_ids)
except Exception as e:
print(f"Error processing record: {e}")
# Here we will start Step Function if ids is not empty so if there are any
# tasks to be updated in Habitica
if ids:
print(f"Dummy - Starting step function for update {', '.join(ids)}")
return {
'statusCode': 200,
'body': ids
}
Connecting events
This section contains a lot trials and errors on how to connect S3 bucket, S3 event notifications and Lambda. If you want a working solution, navigate to GitHub repository.
Now comes the tricky part. We used Serverless Application Model based on CloudFormation to create previous Habitica related projects. So the natural thing to do would be to: create a bucket, create a DynamoDB table, connect it to Lambda Event
property. Let's try to do it.
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: A function that processes task list uploaded to S3 and submits it to Habitica
Globals:
Function:
Timeout: 15
MemorySize: 128
Resources:
# ... resources from previous posts
HabiticaTaskListBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: habitica-task-list-abcdef123456
HabiticaTaskListTable:
Type: AWS::Serverless::SimpleTable
Properties:
TableName: HabiticaTaskList
HabiticaProcessTaskList:
Type: AWS::Serverless::Function
Properties:
CodeUri: process_task_list/
Handler: main.lambda_handler
Runtime: python3.12
Architectures:
- arm64
Policies:
- S3ReadPolicy:
BucketName: !Ref HabiticaTaskListBucket
- DynamoDBCrudPolicy:
TableName: !Ref HabiticaTaskListTable
- AWSSecretsManagerGetSecretValuePolicy:
SecretArn: !Ref HabiticaSecret
Environment:
Variables:
TABLE_NAME: !Ref HabiticaTaskListTable
HABITICA_SECRET: !Ref HabiticaSecret
Events:
S3Event:
Type: S3
Properties:
Bucket: !Ref HabiticaTaskListBucket
Events: s3:ObjectCreated:*
I will not filter the event, just let it process anything that lands in the bucket as why not. After we run sam build
and sam deploy
we will get an error.
Status: FAILED. Reason: Circular dependency between resources:
[HabiticaTaskListBucket, HabiticaProcessTaskListRole, HabiticaProcessTaskListS3EventPermission, HabiticaProcessTaskList]
Ok, so maybe we should create a bucket in a separate stack and then reference its name in this stack. Let's try doing that. I will cut the HabiticaTaskListBucket
resource from this stack, deploy it form a different YAML file, export the bucket name as the output and import it into this SAM template. Create a new template.yml
in a new directory.
---
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: A bucket for Habitica task lists - bucket.yaml
Resources:
HabiticaTaskListBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: habitica-task-list-abcdef123456
Outputs:
BucketName:
Value: !Ref HabiticaTaskListBucket
Export:
Name: HabiticaTaskListBucket
Now instead of Bucket: !Ref HabiticaTaskListBucket
we will use Bucket: !ImportValue HabiticaTaskListBucket
. This will take the bucket name from global CloudFormation outputs and insert it into this stack.
# ...
Resources:
# This is now in a separate stack
#HabiticaTaskListBucket:
# Type: AWS::S3::Bucket
# Properties:
# BucketName: habitica-task-list-abcdef123456
HabiticaProcessTaskList:
Type: AWS::Serverless::Function
Properties:
CodeUri: process_task_list/
# ... cut for clarity
Policies:
- S3ReadPolicy:
BucketName: !ImportValue HabiticaTaskListBucket
# ... cut for clarity
Events:
S3Event:
Type: S3
Properties:
Bucket: !ImportValue HabiticaTaskListBucket
Events: s3:ObjectCreated:*
$ cd tasks_bucket
$ sam build
$ sam deploy --guided
$ cd ..
$ sam build
$ sam deploy
...
Invalid Serverless Application Specification document. Number of errors found: 1.
Resource with id [HabiticaProcessTaskList] is invalid. Event with id [S3Event]
is invalid. S3 events must reference an S3 bucket in the same template.
This seems absurd. So, one possibility is to define the bucket first, deploy and then connect it. But this contradicts the whole infrastructure as code concept as it introduces manual steps. Our only hope is to use native CloudFormation AWS::Lambda::Permission
to give the S3 events source permission to invoke the function and NotificationConfiguration
in S3 resource. Let's try that. Remember to destroy the previously created S3 bucket if you did try the method with importing. In the stack below, Lambda will be created first, then the permission and as the last step the bucket will be created. It has to happen in this order because the S3 notification requires that the permission is already in place. However, there's still one loop in here as well! In the read policy for the Lambda we refer the bucket. However, unlike in case of Event
section we can use a simple string parameter and !Ref
in both BucketName
properties. Essentially, we have to replace all references to the bucket with a string or manually created ARN - which is not ideal but better than double deployment.
Parameters:
HabiticaTaskListBucketName:
Type: String
Description: Name of the S3 bucket that will hold the task list
Default: habitica-task-list-abcdef123456
# ...
Resources:
HabiticaTaskListBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Ref HabiticaTaskListBucketName
NotificationConfiguration:
LambdaConfigurations:
- Event: s3:ObjectCreated:*
Function: !GetAtt HabiticaProcessTaskList.Arn
HabiticaProcessTaskList:
Type: AWS::Serverless::Function
Properties:
CodeUri: process_task_list/
# ... cut for clarity
Policies:
- S3ReadPolicy: - removed
BucketName: !Ref HabiticaTaskListBucketName
# ... cut for clarity
# Events: - removed
HabiticaProcessTaskListS3Permission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: !GetAtt HabiticaProcessTaskList.Arn
Principal: s3.amazonaws.com
SourceArn: !Sub "arn:aws:s3:::${HabiticaTaskListBucketName}"
$ cd tasks_bucket
$ sam delete
$ cd ..
$ sam build && sam deploy
Testing the task list processor
I will upload a file to the S3 bucket and expect DynamoDB table to contain all the valid tasks. Next I will replace the file with some updates to task and I would like to see the table updated accordingly. For now, we will not POST
or PUT
anything into Habitica. We will only track what is happening in the logs and in DynamoDB Table.
$ aws s3 cp tasks.txt s3://habitica-task-list-abcdef123456/tasks.txt
$ # ... checking logs, table
$ aws s3 cp tasks2.txt s3://habitica-task-list-abcdef123456/tasks.txt
$ # ... checking logs and table again
The file that I used is the same as in the example above. As the first step, I checked CloudWatch logs if there's any output from the function. I can see that everything went smoothly in terms of processing the file.
Error processing line '0010. 66/12/3033 EC - this task should be just warned and not crash but is wrong': time data '66/12/3033' does not match format '%d/%m/%Y'
Tasks found in file = 6
Tasks to update:
Now I will scan the DynamoDB table items. I will use AWS Console for that. The screenshot below shows the status of both the first version of the file and some updates that were performed after reuploading with some changes. After second run the logs also show expected results - two tasks were existing previously and have to be updated in the next routine. One task was new and was just silently inserted and mock-created in Habitica.
Tasks found in file = 7
Task 0001 is up to date.
Task 0002 is up to date.
Task 0006 is up to date.
Task 0234 is up to date.
Tasks to update: 0003, 0007
Starting Step Function with 2 tasks.
Preparing a tag in Habitica
Before we create a function that will store every task in Habitica, I suggest creating a tag that will mark each of the tasks that were created automatically. Go to your Habitica dashboard and select Tags
at the top of the lists. On mobile it looks different. Select Edit tags
and add a new one that will be used for the purpose.
Now we need to retrieve the tag's ID. We can automate it in the Lambda on first run but it likely won't change so we can just retrieve it once and hardcode it. To do this, we can use environment variables and curl
. For each read
paste appropriate value. It won't echo since it is specified with -s
flag. The tag you created will likely be the last one in the array.
$ read -s HABITICA_USER
$ read -s HABITICA_KEY
$ export HABITICA_USER HABITICA_KEY
$ curl -s -H "x-api-user: $HABITICA_USER" \
-H "x-api-key: $HABITICA_KEY" \
-H "x-client: $HABITICA_USER-taskscheduler10" \
https://habitica.com/api/v3/tags
...
{
"id": "a2e84af7-4b0d-46a3-8dcc-3b94b4205e59",
"name": "Learning"
},
{
"id": "4aedf1fc-8dd7-4ff9-95f1-1f3112a0b815",
"name": "Automated"
}
],
"notifications": [],
"userV": 21793,
"appVersion": "5.26.1"
}
Edit your HabiticaProcessTaskList
function's environment variables and add
TASK_TAG
with the value you received from above commands.
Function for creating tasks in Habitica
I will get the tag ID from an environment variable. The new function will take the list of new tasks, format it appropriately and batch send it to Habitica API. Afterwards it will return the list of tuples - our task ID and UUID in Habitica that we will need to save back to DynamoDB. This function will be called only once per file update in S3.
I will copy auth.py
from the previous project and use it to create appropriate headers.
import requests, os
from auth import get_headers
HABITICA_URL="https://habitica.com/api/v3"
TASK_TAG = os.getenv("TASK_TAG", "")
HEADERS = get_headers()
def batch_create_tasks(tasks) -> list[(str, str)]:
habitica_tasks = [ create_task(task, TASK_TAG) for task in tasks ]
original_ids = [task['id'] for task in tasks]
url = f"{HABITICA_URL}/tasks/user"
response = requests.post(url, json=habitica_tasks, headers=HEADERS)
code = response.status_code
if code == 200 or code == 201:
data = response.json()['data']
uuids = [ t['id'] for t in data ] if isinstance(data, list) else [data['id']]
return list(zip(original_ids, uuids))
raise Exception(response.json()['message'])
def create_task(task: dict, tag: str = "") -> dict:
# A helper function to format the task as needed
data = {
"text": task['title'],
"type": "todo",
"priority": task['difficulty'],
"attribute": task['attribute']
}
if 'date' in task and task['date']:
data['date'] = task['date'].isoformat()
if tag:
data['tags'] = [tag]
return data
I built the SAM template and deployed the updated function again. I checked on Habitica's side if the new tasks were created and was pleasantly surprised. I also checked the DynamoDB table and new UUIDs were in place. Now it's time to implement the last part - updating the tasks in Habitica after they were updated in DynamoDB.
Lambda for updating
Before we create the Step Function that will be triggered by upstream Lambda that parses the new task list, we will create the downstream Lambda that will be called by this Step Functions. It will read the event that contains the list of task IDs to be updated and will return the same list with the processed item removed. For simplicity the event will also contain Finished
- a boolean that will determine if a loop in the Step Function should break or not.
First, we will retrieve the task from DynamoDB and format it so that it fits Habitica's API. We will throw exceptions in case of problems but it won't crash our process. It's just for logging.
import boto3, os
TABLE_NAME = os.getenv('TABLE_NAME')
ddb = boto3.client('dynamodb').Table(TABLE_NAME)
def get_formatted_task(task_id: str) -> tuple[str, dict]:
row = ddb.get_item(Key={'id': task_id})
if 'Item' in row:
if 'uuid' not in row['Item'] or not row['Item']['uuid']:
raise Exception(f"Task {task_id} does not have a UUID!")
task = row['Item']
return task['uuid'], format_task(task)
raise Exception(f"Task {task_id} not found!")
def format_task(task: dict) -> dict:
formatted = {
"text": task['title'],
"priority": task['difficulty'],
"attribute": task['attribute'],
"date": None # Can be used for clearing the due date
}
if 'date' in task and task['date']:
formatted['date'] = task['date']
return formatted
Now we can create a new function that will just send a request to Habitica to update the task. This one is just a very simple PUT request. auth.py
is also needed to be copied into this Lambda's directory.
import requests
HABITICA_URL="https://habitica.com/api/v3"
def update_task(headers: dict, uuid: str, data: dict):
url = f"{HABITICA_URL}/tasks/{uuid}"
response = requests.put(url, json=data, headers=headers)
code = response.status_code
if code == 200:
return response.json()
raise Exception(response.json()['message'])
As the last step, we glue together the functions and manage the list of tasks received from Step Function.
def lambda_handler(event, context):
tasks = event.get('List', [])
if not tasks:
event['Finished'] = True
return event
try:
task = event['List'][0]
uuid, task = get_formatted_task(task)
update_task(HEADERS, uuid, task)
except Exception as e:
print(e) # We will continue processing and just log problems
event['List'] = event['List'][1:]
# We will also control Step Function's loop from here
event['Finished'] = len(event['List']) == 0
return event
Step Function for updating and delaying each call
Now it's time to define a Step Function that will be triggered by the first Lambda and will execute the second Lambda in a loop with a delay. We can define it in AWS SAM as AWS::Serverless::StateMachine
resource. The definition will look like the code below (we can use YAML for defining the states). Our state machine will also need permissions to execute Lambda. As a loop we will use Choice
block that will check if input variable Finished
is set to false
or otherwise go to Succeed
state. The Lambda will output transformed variables that will be given back to Loop Tasks
. Wait block simply passes all the values as they are.
HabiticaUpdateTasksStateMachine:
Type: AWS::Serverless::StateMachine
Properties:
Policies:
- LambdaInvokePolicy:
FunctionName: !Ref HabiticaUpdateTasksLambda
Definition:
StartAt: Loop Tasks
States:
# Loop for all tasks
Loop Tasks:
Type: Choice
Default: Succeed
Choices:
- Variable: "$.Finished"
BooleanEquals: false
Next: Update Task
# Loop start
Update Task:
Type: Task
Resource: !GetAtt HabiticaUpdateTasksLambda.Arn
Parameters:
List.$: "$.List"
Finished: false
Next: Wait
Wait:
Type: Wait
Seconds: 2
Next: Loop Tasks
# Loop end
Succeed:
Type: Succeed
The diagram above shows how the Step Function look like after deploying it with SAM. As you see, we still have to define a Lambda function for updating tasks. This is a simple setup like for previous function. It will just reference secret and DynamoDB table.
HabiticaUpdateTasksLambda:
Type: AWS::Serverless::Function
Properties:
CodeUri: update_tasks/
Handler: main.lambda_handler
Runtime: python3.12
Architectures:
- arm64
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref HabiticaTaskListTable
- AWSSecretsManagerGetSecretValuePolicy:
SecretArn: !Ref HabiticaSecret
Environment:
Variables:
HABITICA_SECRET: !Ref HabiticaSecret
TABLE_NAME: !Ref HabiticaTaskListTable
Now we will edit the first Lambda function to allow it to trigger the Step Function and pass the list of tasks to update. Insert the following policy and environment variable inside. Also add a line at the end of lambda_handler
to the main.py
of that first function.
HabiticaProcessTaskList:
Type: AWS::Serverless::Function
Properties:
CodeUri: process_task_list/
# ... cut for clarity
Policies:
# ... cut for clarity
- StepFunctionsExecutionPolicy:
StateMachineName: !GetAtt HabiticaUpdateTasksStateMachine.Name
Environment:
Variables:
TABLE_NAME: !Ref HabiticaTaskListTable
HABITICA_SECRET: !Ref HabiticaSecret
TASK_TAG: 01234567-89ab-cdef-0123-456789abcdef
STEP_FUNCTION_NAME: !Ref HabiticaUpdateTasksStateMachine
# ... imports
step = client('stepfunctions')
STEP_FUNCTION_NAME = os.getenv('STEP_FUNCTION_NAME')
def lambda_handler(event, context):
ids = []
# ... code follows
# If the list is empty, we don't have to even execute the Step Function
if len(ids) > 0 and STEP_FUNCTION_NAME:
print(f"Starting Step Function {STEP_FUNCTION_NAME} with {len(ids)} tasks.")
step.start_execution(stateMachineArn=STEP_FUNCTION_NAME, input=json.dumps({"List": ids, "Finished": False}))
return {
'statusCode': 200,
'body': {
'List': ids,
'Finished': len(ids) == 0
}
}
Final test
I deleted all the tasks in Habitica and DynamoDB. I uploaded the first list of tasks and an update to it. The tasks were created as expected and updated in Habitica as well!
I also checked how the Step Function behaves. It correctly looped twice for two updated tasks. Step Functions allow to look into the execution for each step, check the inputs and outputs during the process. It's very useful for debugging.
As mentioned before, this solution does not support deletion of tasks. Another idea is to connect a Git repository with CodePipeline that will update the file in S3. For a developer, such a solution would be even more usable than uploading files to S3 even using AWS CLI. That way you would also be able to track history of changes.