Skip to content

Commit

Permalink
fix: subscriptions payloads _entity w/ relations fields (#2626)
Browse files Browse the repository at this point in the history
* fix: subscriptions payloads _entity w/ camelCase fileds

* fix: load subscriptions relations

* fix: sync-helper test

---------

Co-authored-by: Scott Twiname <[email protected]>
  • Loading branch information
IcanDivideBy0 and stwiname authored Jan 20, 2025
1 parent d745434 commit 89f5537
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 13 deletions.
6 changes: 3 additions & 3 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed
- Updated send_notification PG function to include `_block height` and entity `_id` (#2626)

### Added
- Add `TextEncoder` in sandbox, some network package util method is depended on it
- Added the `--monitor-object-max-depth` flag to mitigate OOM issues when encountering large chunks.(#2644)
- Configuration options for PG pool connections (#2646)

### Fixed
- Fixed the inconsistency between the `monitor-file-size` flag and the expected behavior.(#2644)


### Fixed
- Improved block range validation in POI endpoint with custom class-validator decorator

## [16.1.0] - 2024-12-11
Expand Down
6 changes: 3 additions & 3 deletions packages/node-core/src/db/sync-helper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,15 @@ describe('sync helper test', () => {
// For that reason the behaviour is kept the same as before delete was fixed.
expect(listener).toHaveBeenNthCalledWith(
1,
`{"id": "1", "_entity": {"id": "1", "block_number": 1}, "mutation_type": "UPDATE"}`
`{"id": "1", "_entity": {"id": "1", "_id": "adde2f8c-cb87-4e84-9600-77f434556e6d", "block_number": 1}, "_block_height": 1, "mutation_type": "UPDATE"}`
);
expect(listener).toHaveBeenNthCalledWith(
2,
`{"id": "1", "_entity": {"id": "1", "block_number": 2}, "mutation_type": "UPDATE"}`
`{"id": "1", "_entity": {"id": "1", "_id": "9396aca4-cef2-4b52-98a7-c5f1ed3edb81", "block_number": 2}, "_block_height": 2, "mutation_type": "UPDATE"}`
);
expect(listener).toHaveBeenNthCalledWith(
3,
`{"id": "1", "_entity": {"id": "1", "block_number": 2}, "mutation_type": "DELETE"}`
`{"id": "1", "_entity": {"id": "1", "_id": "9396aca4-cef2-4b52-98a7-c5f1ed3edb81", "block_number": 2}, "_block_height": 2, "mutation_type": "DELETE"}`
);
}, 10_000);
});
Expand Down
6 changes: 3 additions & 3 deletions packages/node-core/src/db/sync-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@ BEGIN
'mutation_type', TG_OP,
'_entity', row);
IF payload -> '_entity' ? '_block_range' then
payload = payload #- '{"_entity","_id"}';
payload = payload #- '{"_entity","_block_range"}';
payload = payload #- '{"_entity","_block_range"}';
payload = payload || jsonb_build_object('_block_height', lower(row._block_range));
IF NOT upper_inf(row._block_range) then
-- Check if a newer version of the entity exists to determine operation
-- Check if a newer version of the entity exists to determine operation
EXECUTE FORMAT(
'SELECT EXISTS (SELECT 1 FROM "${schema}".%I WHERE id = $1 AND lower(_block_range) = upper($2))',
TG_TABLE_NAME
Expand Down
3 changes: 3 additions & 0 deletions packages/query/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed
- Subscriptions `_entity` field now returns all properties (#2626)

## [2.19.0] - 2024-12-11
### Added
- Support for ordering with fulltext search (#2623)
Expand Down
2 changes: 1 addition & 1 deletion packages/query/src/graphql/graphql.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ export class GraphqlModule implements OnModuleInit, OnModuleDestroy {
path: WS_ROUTE,
});

this.wsCleanup = useServer({schema}, wsServer);
this.wsCleanup = useServer({schema, context: {pgClient: this.pgPool}}, wsServer);
}

app.use(PinoLogger(PinoConfig));
Expand Down
24 changes: 21 additions & 3 deletions packages/query/src/graphql/plugins/PgSubscriptionPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import {hashName} from '@subql/utils';
import {PgIntrospectionResultsByKind} from '@subql/x-graphile-build-pg';
import {makeExtendSchemaPlugin, gql, embed} from 'graphile-utils';
import {makeExtendSchemaPlugin, gql, embed, Resolvers} from 'graphile-utils';
import {DocumentNode} from 'graphql';

const filter = (event, args) => {
Expand All @@ -30,7 +30,7 @@ function makePayload(entityType: string): {type: DocumentNode; name: string} {
}

export const PgSubscriptionPlugin = makeExtendSchemaPlugin((build) => {
const {inflection, pgIntrospectionResultsByKind} = build;
const {inflection, pgIntrospectionResultsByKind, pgSql: sql} = build;

const typeDefs = [
gql`
Expand All @@ -42,7 +42,7 @@ export const PgSubscriptionPlugin = makeExtendSchemaPlugin((build) => {
`,
];

const resolvers: Record<string, any> = {};
const resolvers: Resolvers = {};

// Generate subscription fields for all database tables
(pgIntrospectionResultsByKind as PgIntrospectionResultsByKind).class.forEach((table) => {
Expand All @@ -65,6 +65,24 @@ export const PgSubscriptionPlugin = makeExtendSchemaPlugin((build) => {
)
}`
);

resolvers[payloadName] = {
_entity: {
resolve: async ({_block_height, _entity}, args, context, resolveInfo) => {
const [row] = await resolveInfo.graphile.selectGraphQLResultFromTable(
sql.identifier(table.namespace.name, table.name),
(tableAlias, queryBuilder) => {
queryBuilder.context.args ??= {};
queryBuilder.context.args.blockHeight = sql.fragment`${sql.value(_block_height.toString())}::bigint`;
queryBuilder.where(sql.fragment`${tableAlias}._id = ${sql.value(_entity._id)}`);
queryBuilder.limit(1);
}
);

return row;
},
},
};
});

return {
Expand Down

0 comments on commit 89f5537

Please sign in to comment.