Cron Job creation in Node Js to clear out unwanted records from DynamoDB.
Here in this blog we will see How to create cron job in node JS which will delete unnecessary records from AWS dynamodb database.
Let go step by step:
Step: 1)
First of all lets create simple cron Job in Node server. How can we create it, well there many npm packages which supports to run specific task to be executed on specified time interval or any specific time of the day.
Here are npm libraries can be used:
node-cron module
node-schedule module
Using setInterval function
But here in this blog we will be using node-cron module.
Steps to use node-cron module.
Install node cron module- npm install node-cron
Insert node-cron package in index.js (main) file.
Call the schedule method provided by nodeCron – where we can provide further details like time interval/specific day time when we want to run job and specific job –which we wanted to run.
The six start (*) mentioned in schedule method decides the time duration when we want to execute the cron job.
For Example:
\****** : This will run the Job on every seconds.
1 *****: This will run job on 1st second of every minute.
1 1**** : Job will run on every 1st second of 1st minute of every hour.
3 1 2** *: Job will run on every 3rd second or 1st minute of 2 am on every day. Means 02:01:03 AM.
This way we can decide the time to run the job.
Step 2)
lets write actual task here- which will delete the unwanted records from DynamoDB.
Suppose there are millions of users with having unique id for every user and each user have one or many task assigned some task, it could be valid and invalid task assigned to user. And we want to delete expired task of user. Also delete user record if user do not have any single valid task.
The things are not as straight forward as what we did with other databases like sql, mysql, postgres. There we can directly write query with conditional delete records.
In above mentioned databases we don’t required to send user primary key while deleting/updating the records – so we can directly write delete query with condition, so database will find by its own the records fulfilling the delete condition and delete or update the records.
BUT in dynamodb we need to send primary key if we want to delete specific record.
How do you know from millions of records which are the primary keys of users to delete the records?
For that purpose, we first need to fetch db records.
Identify the records that fulfill the delete condition.
Fetch the identified records to delete the query to delete the records from DB.
Step 3)
Let’s fetch the Dynamodb records first:
Intiate AWS sdk and update aws config as below:
Initialise dynamodb instance:
const dynamdoDb = new AWS.DynamoDB({ apiVersion: "2012-08-10" });
Let’s create a function which we will call later – This function should scan the dynamodb records and further delete the records.
const bulkUpdate = async () => {
try {
const tableName = "User_recods";
const scanParams = {
ExpressionAttributeNames: {
"#ID": "id",
"#JB": "Jobs",
},
ProjectionExpression: "#ID, #JB",
TableName: tableName,
};
const response = await new Promise((res, rej) => {
return dynamdoDb.scan(scanParams, function (err, data) {
if (err) rej(err);
else res(data);
})
})
const result = response.Items || [];
console.log("results", result);
} catch (e) {
console.error(e);
}
}
bulkUpdate();
Here above we have created bulkUpdate().
Inside the scanParams object, we are adding substitutions or aliases for our attributes id and jobs with #ID and #JB respectively in ExpressionAttributeNames. The ProjectionExpression property helps us narrow down the attributes returned from the table to just id and status.
Here dynamdoDb.scan will scan the Db and return the records- which we can console log as a results parameter. Now here we can store our records in the results parameter.
Not let’s filter out the records from results that actually need to be deleted/updated.
let updatedUser = [];
result.map((userDetail => {
var validUser = userDetail.Jobs.filter( function (job) {
return job.validity = "ACTIVE"
})
if(userDetail.Jobs.length > validUser.length) {
userDetail.Jobs = validUser
updatedUseer.push(userDetail)
}
}))
Here we are looping the results received from Dynamodb and applying filter so that only active jobs will be stored into the updatedUser array.
Now we can use this updateUser[] array to further delete/update the only specified records.
Now that we have all the items that need to be updated, we will use dynamoDb’s transcation API, specifically the method transactWriteItems to update multiple items at once.
We can use transactWriteItems to delete multiple items in below way.
await dynamdoDb.transactWriteItems(transactWriteParams).promise();
As you can see above transactWriteItems method need to provide the list of details in specific format. In above case transactWriteParams provide those details to transactWriteItems method.
Now Let’s define transactWriteItems.
let transactItems = [];
const transactWriteParams = {
TransactItems: [...transactItems],
}
How to create transactItems here?
Well we can loop the above updatedUser array and store the specified details inside the transactItems parameter.
for (const user of updatedUser) {
transactItems = [
...transactItems,
{
Update: {
ExpressionAttributeNames: {
"#JB": 'Jobs',
},
ExpressionAttributeValues: {
":st_valid": {
"L": [
{
"M": {
"JobId": {S: user.Jobs.Id},
"JobName": {S: user.Jobs.Name},
"validity": {S: user.Jobs.validity},
}
}]}
},
UpdateExpression: 'set #JB = :st_valid',
TableName: tableName,
Key: {
id: {
S: user.id
}
},
ReturnValuesOnConditionCheckFailure: "ALL_OLD"
}}]
}
const transactWriteParams = {
TransactItems: [...transactItems],
}
Here every user have unique id and Jobs object. The Jobs object include Job id, job name and job validity.
We first define an empty array called transactItems. Then we loop through each of the updatedUser and construct an object for each one of them. Each of these objects will have an Update object in it.
we define ExpressionAttributeValues which provides us a with a way to subsititute different values for our jobs property, which we can then use in different expressions within the object. Here, we define the value :st_valid and set it to another object values based upon user.
We repeat this process for all the updatedUser and add all the corresponding objects to the transactItems array which we then pass inside the transactWriteItems object.
Once the transactWriteParams object is ready we can pass it to transactWriteItems.
await dynamdoDb.transactWriteItems(transactWriteParams).promise();
This will update all the users at a time with only active jobs. Basically here we are deleting the all inactive jobs from all users.