Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Twitter Poller #10873

Merged
merged 21 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libs/adapters/src/utils/startHealthCheckLoop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export enum ServiceKey {
CommonwealthConsumer = 'commonwealth-consumer',
MessageRelayer = 'message-relayer',
DiscordBotListener = 'discord-bot-listener',
TwitterWorker = 'twitter-worker',
}

export type HealthCheckOptions = {
Expand Down
2 changes: 2 additions & 0 deletions libs/core/src/integration/outbox.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ export const outboxEvents: Events[] = [
'UserMentioned',
'QuestStarted',
'AddressOwnershipTransferred',
'TwitterMomBotMentioned',
'TwitterContestBotMentioned',
] as const;

export const Outbox = z.union(
Expand Down
3 changes: 0 additions & 3 deletions libs/core/src/ports/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,9 @@ export interface Stats extends Disposable {
export enum CacheNamespaces {
Route_Response = 'route_response',
Function_Response = 'function_response',
Global_Response = 'global_response',
Test_Redis = 'test_redis',
Database_Cleaner = 'database_cleaner',
Compound_Gov_Version = 'compound_gov_version',
Token_Balance = 'token_balance',
Activity_Cache = 'activity_cache',
Rate_Limiter = 'rate_limiter',
Api_key_auth = 'api_key_auth',
Query_Response = 'query_response',
Expand Down
2 changes: 2 additions & 0 deletions libs/model/src/models/factories.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import ThreadSubscription from './thread_subscriptions';
import ThreadVersionHistory from './thread_version_history';
import LaunchpadToken from './token';
import Topic from './topic';
import TwitterCursor from './twitter_cursor';
import User from './user';
import Vote from './vote';
import Wallets from './wallets';
Expand Down Expand Up @@ -89,6 +90,7 @@ export const Factories = {
ThreadVersionHistory,
ThreadSubscription,
Topic,
TwitterCursor,
User,
Vote,
Webhook,
Expand Down
1 change: 1 addition & 0 deletions libs/model/src/models/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export * from './tags';
export * from './thread';
export * from './thread_version_history';
export * from './topic';
export * from './twitter_cursor';
export * from './types';
export * from './user';
export * from './vote';
Expand Down
29 changes: 29 additions & 0 deletions libs/model/src/models/twitter_cursor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { TwitterCursor } from '@hicommonwealth/schemas';
import Sequelize from 'sequelize';
import { z } from 'zod';
import { ModelInstance } from './types';

export type TwitterCursorAttributes = z.infer<typeof TwitterCursor>;

export type TwitterCursorInstance = ModelInstance<TwitterCursorAttributes>;

export default (
sequelize: Sequelize.Sequelize,
): Sequelize.ModelStatic<TwitterCursorInstance> =>
sequelize.define<TwitterCursorInstance>(
'TwitterCursor',
{
bot_name: {
type: Sequelize.STRING,
primaryKey: true,
},
last_polled_timestamp: {
type: Sequelize.BIGINT,
allowNull: false,
},
},
{
timestamps: false,
tableName: 'TwitterCursors',
},
);
1 change: 1 addition & 0 deletions libs/schemas/src/entities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export * from './stake.schemas';
export * from './tag.schemas';
export * from './thread.schemas';
export * from './topic.schemas';
export * from './twitter-cursors.schemas';
export * from './user.schemas';
export * from './wallets.schemas';
export * from './webhook.schemas';
Expand Down
7 changes: 7 additions & 0 deletions libs/schemas/src/entities/twitter-cursors.schemas.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { TwitterBotName } from '@hicommonwealth/shared';
import { z } from 'zod';

export const TwitterCursor = z.object({
bot_name: z.nativeEnum(TwitterBotName),
last_polled_timestamp: z.number(),
});
4 changes: 4 additions & 0 deletions libs/schemas/src/events/events.schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { FarcasterAction } from '../entities/farcaster.schemas';
import { SubscriptionPreference } from '../entities/notification.schemas';
import { Reaction } from '../entities/reaction.schemas';
import { Thread } from '../entities/thread.schemas';
import { Tweet } from '../integrations';
import { PG_INT } from '../utils';
import {
CommunityStakeTrade,
Expand Down Expand Up @@ -378,4 +379,7 @@ export const events = {
end_date: z.coerce.date(),
community_id: z.string().nullish(),
}),

TwitterMomBotMentioned: Tweet,
TwitterContestBotMentioned: Tweet,
} as const;
1 change: 1 addition & 0 deletions libs/schemas/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export * from './commands';
export * from './context';
export * from './entities';
export * from './events';
export * from './integrations';
export * from './projections';
export * from './queries';
export * from './utils';
1 change: 1 addition & 0 deletions libs/schemas/src/integrations/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './twitter';
41 changes: 41 additions & 0 deletions libs/schemas/src/integrations/twitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { z } from 'zod';

export const Tweet = z.object({
id: z.string(),
author_id: z.string(),
username: z.string(),
created_at: z.string(),
text: z.string().describe('The first 280 characters of the tweet'),
note_tweet: z
.string()
.optional()
.describe('The full tweet text including anything above 280 characters'),
conversation_id: z.string().optional(),
reply_settings: z
.enum([
'everyone',
'mentionedUsers',
'following',
'other',
'subscribers',
'verified',
])
.optional(),
});

export const TwitterMentionsTimeline = z.object({
data: z.array(Tweet),
errors: z.array(
z.object({
title: z.string(),
type: z.string(),
detail: z.string().optional(),
status: z.number().optional(),
}),
),
meta: z
.object({
next_token: z.string().optional(),
})
.optional(),
});
9 changes: 8 additions & 1 deletion libs/shared/src/types/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ThresholdData } from './protocol';

export const Roles = ['admin', 'moderator', 'member'] as const;
export type Role = typeof Roles[number];
export type Role = (typeof Roles)[number];

export type AddressRole = {
address: string;
Expand Down Expand Up @@ -70,3 +70,10 @@ export type Link = {
identifier: string;
title?: string;
};

// These are meant to be static and Common focused
// i.e. these should not change even if the underlying Twitter account changes
export enum TwitterBotName {
MomBot = 'MomBot',
ContestBot = 'ContestBot',
}
1 change: 1 addition & 0 deletions packages/commonwealth/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"start-evm-ce": "tsx server/workers/evmChainEvents/startEvmPolling.ts",
"start-knock": "tsx server/workers/knock/knockWorker.ts",
"start-message-relayer": "tsx ./server/workers/messageRelayer/messageRelayer.ts",
"start-twitter": "tsx server/workers/twitterWorker/twitterWorker.ts",
"stylelint": "stylelint client/styles/*",
"test": "pnpm test-unit",
"test-api": "NODE_ENV=test vitest --config ../../vite.config.ts --coverage run ./test/integration/api",
Expand Down
15 changes: 15 additions & 0 deletions packages/commonwealth/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const {
DEV_MODULITH,
ENABLE_CLIENT_PUBLISHING,
EVM_CE_LOG_TRACE,
TWITTER_WORKER_POLL_INTERVAL,
} = process.env;

const DEFAULTS = {
Expand All @@ -38,6 +39,8 @@ const DEFAULTS = {
MESSAGE_RELAYER_TIMEOUT_MS: '200',
MESSAGE_RELAYER_PREFETCH: '50',
EVM_CE_POLL_INTERVAL: '120000',
// 16 minutes -> 15 minute rate limit window + 1 minute buffer to account for func execution time
TWITTER_WORKER_POLL_INTERVAL: 16 * 60 * 1000,
};

export const config = configure(
Expand Down Expand Up @@ -107,6 +110,15 @@ export const config = configure(
},
DEV_MODULITH: DEV_MODULITH === 'true',
ENABLE_CLIENT_PUBLISHING: ENABLE_CLIENT_PUBLISHING === 'true',
TWITTER: {
WORKER_POLL_INTERVAL: (() => {
if (TWITTER_WORKER_POLL_INTERVAL)
return parseInt(TWITTER_WORKER_POLL_INTERVAL, 10);
else if (model_config.APP_ENV === 'local')
return DEFAULTS.TWITTER_WORKER_POLL_INTERVAL;
else return 0;
})(),
},
},
z.object({
NO_GLOBAL_ACTIVITY_CACHE: z.boolean(),
Expand Down Expand Up @@ -191,5 +203,8 @@ export const config = configure(
}),
DEV_MODULITH: z.boolean(),
ENABLE_CLIENT_PUBLISHING: z.boolean(),
TWITTER: z.object({
WORKER_POLL_INTERVAL: z.number().int().gte(0),
}),
}),
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
'use strict';

/** @type {import('sequelize-cli').Migration} */
module.exports = {
async up(queryInterface, Sequelize) {
return queryInterface.createTable('TwitterCursors', {
bot_name: {
type: Sequelize.STRING,
primaryKey: true,
},
last_polled_timestamp: {
type: Sequelize.BIGINT,
allowNull: false,
},
});
},

async down(queryInterface, Sequelize) {
return queryInterface.dropTable('TwitterCursors');
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ async function main() {
isServiceHealthy = true;
bootstrapContestRolloverLoop();
} catch (error) {
isServiceHealthy = false;
log.fatal('Consumer setup failed', error);
}
}
Expand Down
94 changes: 94 additions & 0 deletions packages/commonwealth/server/workers/twitterWorker/pollTwitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { logger } from '@hicommonwealth/core';
import { Tweet, TwitterMentionsTimeline } from '@hicommonwealth/schemas';
import fetch from 'node-fetch';
import z from 'zod';
import { TwitterBotConfig } from './utils';

const log = logger(import.meta);

async function getFromTwitter({
twitterBotConfig,
url,
queryParams,
}: {
twitterBotConfig: TwitterBotConfig;
url: string;
queryParams: Record<string, string | Date | number>;
}): Promise<{ jsonBody: Record<string, unknown>; requestsRemaining: number }> {
const parsedQueryParams: Record<string, string> = Object.fromEntries(
Object.entries(queryParams).map(([key, value]) => [
key,
value instanceof Date ? value.toISOString() : value.toString(),
]),
);

const queryString = new URLSearchParams(parsedQueryParams).toString();
const fullUrl = `${url}?${queryString}`;

const response = await fetch(fullUrl, {
method: 'GET',
headers: {
Authorization: `Bearer ${twitterBotConfig.bearerToken}`,
'Content-Type': 'application/json',
},
});

if (!response.ok) {
throw new Error(
`Request failed with status ${response.status}: ${response.statusText}`,
);
}
rbennettcw marked this conversation as resolved.
Show resolved Hide resolved

return {
jsonBody: await response.json(),
requestsRemaining: Number(response.headers.get('x-rate-limit-remaining')),
};
}

// https://docs.x.com/x-api/posts/user-mention-timeline-by-user-id
export async function getMentions({
twitterBotConfig,
startTime,
endTime,
}: {
twitterBotConfig: TwitterBotConfig;
startTime: Date;
endTime: Date;
}): Promise<{ mentions: z.infer<typeof Tweet>[]; endTime: Date }> {
const allMentions: z.infer<typeof Tweet>[] = [];
let paginationToken: string | undefined;
let requestsRemaining: number;
do {
const res = await getFromTwitter({
twitterBotConfig,
url: `https://api.x.com/2/users/${twitterBotConfig.twitterUserId}/mentions`,
queryParams: {
start_time: startTime,
end_time: endTime,
},
});
const parsedRes = TwitterMentionsTimeline.parse(res.jsonBody);
paginationToken = parsedRes.meta?.next_token;
requestsRemaining = res.requestsRemaining;

for (const error of parsedRes.errors) {
log.error(
'Error occurred polling for Twitter mentions',
new Error(JSON.stringify(error)),
{
botName: twitterBotConfig.name,
},
);
}
allMentions.push(...parsedRes.data);
} while (paginationToken && requestsRemaining > 0);

if (paginationToken && requestsRemaining === 0 && allMentions.length > 0) {
return {
mentions: allMentions,
endTime: new Date(allMentions.at(-1)!.created_at),
};
}

return { mentions: allMentions, endTime };
}
Loading
Loading