Skip to content

Commit

Permalink
feat: add aws, compress
Browse files Browse the repository at this point in the history
  • Loading branch information
willfarrell committed Aug 28, 2022
1 parent c1774a4 commit f1daea2
Show file tree
Hide file tree
Showing 62 changed files with 8,603 additions and 5,785 deletions.
2 changes: 1 addition & 1 deletion lerna.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"packages": ["packages/*"],
"useNx": false,
"version": "0.0.3"
"version": "0.0.5"
}
8,305 changes: 3,222 additions & 5,083 deletions package-lock.json

Large diffs are not rendered by default.

19 changes: 9 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@datastream/monorepo",
"version": "0.0.3",
"version": "0.0.4",
"description": "Streams made easy.",
"type": "module",
"engines": {
Expand All @@ -12,7 +12,7 @@
"commit-msg": "commitlint --config commitlint.config.cjs --edit",
"pre-commit": "lint-staged",
"install": "lerna bootstrap",
"lint": "lint-staged",
"lint": "prettier --write packages/*/*.js && standard --fix packages/*/*.js",
"test": "npm run build && c8 node --test --conditions=node packages && c8 node --test --conditions=webstream packages",
"build": "bin/esbuild",
"release:tag": "git tag $npm_package_version && git push --tags",
Expand All @@ -23,8 +23,8 @@
"lerna:outdated": "lerna exec --concurrency 5 npm outdated",
"lerna:audit": "lerna exec --concurrency 2 npm audit fix",
"lerna:sync": "lerna publish --exact --yes --skip-npm --skip-git --repo-version $npm_package_version",
"lerna:publish": "lerna publish --exact --yes --skip-git --repo-version $npm_package_version",
"lerna:publish:next": "lerna publish --exact --yes --skip-git --repo-version $npm_package_version --dist-tag next"
"lerna:publish": "npm run build && lerna publish --exact --yes --skip-git --repo-version $npm_package_version",
"lerna:publish:next": "npm run build && lerna publish --exact --yes --skip-git --repo-version $npm_package_version --dist-tag next"
},
"repository": {
"type": "git",
Expand All @@ -48,14 +48,13 @@
},
"homepage": "https://middy.js.org",
"devDependencies": {
"@commitlint/cli": "^17.0.0",
"@commitlint/config-conventional": "^17.0.0",
"ava": "4.3.1",
"@commitlint/cli": "^17.1.1",
"@commitlint/config-conventional": "^17.1.0",
"benchmark": "^2.1.4",
"c8": "7.12.0",
"esbuild": "0.14.49",
"c8": "^7.12.0",
"esbuild": "^0.15.5",
"husky": "^8.0.0",
"lerna": "^5.1.8",
"lerna": "^5.4.3",
"lint-staged": "^13.0.0",
"prettier": "^2.0.0",
"sinon": "^14.0.0",
Expand Down
177 changes: 177 additions & 0 deletions packages/aws/dynamodb.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import { createWritableStream, timeout } from '@datastream/core'
import { DynamoDBClient } from '@aws-sdk/client-dynamodb'

import { Agent } from 'node:https'
import { NodeHttpHandler } from '@aws-sdk/node-http-handler'
import AWSXRay from 'aws-xray-sdk-core'
const {
BatchGetCommand,
BatchWriteCommand,
QueryCommand,
ScanCommand,
DynamoDBDocumentClient
} = require('@aws-sdk/lib-dynamodb')

const awsClientDefaults = {
requestHandler: new NodeHttpHandler({
httpsAgent: new Agent({
keepAlive: true,
secureProtocol: 'TLSv1_2_method'
})
}),
useFipsEndpoint: [
'us-east-1',
'us-east-2',
'us-west-1',
'us-west-2',
'ca-central-1'
].includes(process.env.AWS_REGION)
}

let dynamodb = AWSXRay.captureAWSv3Client(
new DynamoDBClient(awsClientDefaults)
)
let dynamodbDocument = DynamoDBDocumentClient.from(dynamodb)
export const awsDynamoDBSetClient = (client) => {
dynamodb = client
dynamodbDocument = DynamoDBDocumentClient.from(dynamodb)
}

// Docs: https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html

// options = {TableName, ...}

export const awsDynamoDBQueryStream = (options, streamOptions) => {
async function * command (options) {
let count = 1
while (count) {
const response = await dynamodbDocument.send(new QueryCommand(options))
options.ExclusiveStartKey = response.LastEvaluatedKey
count = response.Count
for (const item of response.Items) {
yield item
}
}
}
return command(options)
}

export const awsDynamoDBScanStream = async (options, streamOptions) => {
async function * command (options) {
let count = 1
while (count) {
const response = await dynamodbDocument.send(new ScanCommand(options))
options.ExclusiveStartKey = response.LastEvaluatedKey
count = response.Count
for (const item of response.Items) {
yield item
}
}
}
return command(options)
}

// max Keys.length = 100
export const awsDynamoDBGetStream = async (options, streamOptions) => {
options.retryCount ??= 0
options.retryMaxCount ??= 10
async function * command (options) {
while (true) {
const response = await dynamodbDocument.send(
new BatchGetCommand({
RequestItems: {
[options.TableName]: options
}
})
)
for (const item of response.Responses[options.TableName]) {
yield item
}
const UnprocessedKeys =
response?.UnprocessedKeys?.[options.TableName]?.Keys ?? []

if (!UnprocessedKeys.length) {
break
}
if (options.retryCount >= options.retryMaxCount) {
throw new Error('awsDynamoDBBatchGet has UnprocessedKeys', {
cause: {
...options,
UnprocessedKeysCount: UnprocessedKeys.length
}
})
}

await timeout(3 ** options.retryCount++) // 3^10 == 59sec

options.Keys = UnprocessedKeys
}
}
return command(options)
}

export const awsDynamoDBPutItemStream = (options, streamOptions) => {
options.retryCount ??= 0
options.retryMaxCount ??= 10
let batch = []
const write = async (chunk) => {
if (batch.length === 10) {
await dynamodbBatchWrite(options, batch, streamOptions)
batch = []
}
batch.push({
PutRequest: {
Item: chunk
}
})
}
streamOptions.final = () => dynamodbBatchWrite(options, batch, streamOptions)
return createWritableStream(write, streamOptions)
}

export const awsDynamoDBDeleteItemStream = (options, streamOptions) => {
options.retryCount ??= 0
options.retryMaxCount ??= 10
let batch = []
const write = async (chunk) => {
if (batch.length === 10) {
await dynamodbBatchWrite(options, batch, options)
batch = []
}
batch.push({
DeleteRequest: {
Key: chunk
}
})
}
streamOptions.final = () => dynamodbBatchWrite(options, batch, options)
return createWritableStream(write, streamOptions)
}

const dynamodbBatchWrite = async (options, batch, streamOptions) => {
const { UnprocessedItems } = await dynamodbDocument.send(
new BatchWriteCommand({
RequestItems: {
[options.TableName]: batch
}
})
)
if (UnprocessedItems.length) {
if (options.retryCount >= options.retryMaxCount) {
throw new Error('awsDynamoDBBatchWrite has UnprocessedItems', {
cause: {
...options,
UnprocessedItemsCount: UnprocessedItems.length
}
})
}

await timeout(3 ** options.retryCount++) // 3^10 == 59sec
return dynamodbBatchWrite(
options,
UnprocessedItems[options.TableName],
options
)
}
options.retryCount = 0 // reset for next batch
}
Loading

0 comments on commit f1daea2

Please sign in to comment.