Skip to content

Commit

Permalink
Merge pull request #10873 from hicommonwealth/tim/twitter-poller
Browse files Browse the repository at this point in the history
  • Loading branch information
timolegros authored Feb 12, 2025
2 parents b39c4f3 + 016f125 commit bcc9731
Show file tree
Hide file tree
Showing 20 changed files with 404 additions and 4 deletions.
1 change: 1 addition & 0 deletions libs/adapters/src/utils/startHealthCheckLoop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export enum ServiceKey {
CommonwealthConsumer = 'commonwealth-consumer',
MessageRelayer = 'message-relayer',
DiscordBotListener = 'discord-bot-listener',
TwitterWorker = 'twitter-worker',
}

export type HealthCheckOptions = {
Expand Down
2 changes: 2 additions & 0 deletions libs/core/src/integration/outbox.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ export const outboxEvents: Events[] = [
'UserMentioned',
'QuestStarted',
'AddressOwnershipTransferred',
'TwitterMomBotMentioned',
'TwitterContestBotMentioned',
] as const;

export const Outbox = z.union(
Expand Down
3 changes: 0 additions & 3 deletions libs/core/src/ports/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,9 @@ export interface Stats extends Disposable {
export enum CacheNamespaces {
Route_Response = 'route_response',
Function_Response = 'function_response',
Global_Response = 'global_response',
Test_Redis = 'test_redis',
Database_Cleaner = 'database_cleaner',
Compound_Gov_Version = 'compound_gov_version',
Token_Balance = 'token_balance',
Activity_Cache = 'activity_cache',
Rate_Limiter = 'rate_limiter',
Api_key_auth = 'api_key_auth',
Query_Response = 'query_response',
Expand Down
2 changes: 2 additions & 0 deletions libs/model/src/models/factories.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import ThreadSubscription from './thread_subscriptions';
import ThreadVersionHistory from './thread_version_history';
import LaunchpadToken from './token';
import Topic from './topic';
import TwitterCursor from './twitter_cursor';
import User from './user';
import Vote from './vote';
import Wallets from './wallets';
Expand Down Expand Up @@ -89,6 +90,7 @@ export const Factories = {
ThreadVersionHistory,
ThreadSubscription,
Topic,
TwitterCursor,
User,
Vote,
Webhook,
Expand Down
1 change: 1 addition & 0 deletions libs/model/src/models/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export * from './tags';
export * from './thread';
export * from './thread_version_history';
export * from './topic';
export * from './twitter_cursor';
export * from './types';
export * from './user';
export * from './vote';
Expand Down
29 changes: 29 additions & 0 deletions libs/model/src/models/twitter_cursor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { TwitterCursor } from '@hicommonwealth/schemas';
import Sequelize from 'sequelize';
import { z } from 'zod';
import { ModelInstance } from './types';

export type TwitterCursorAttributes = z.infer<typeof TwitterCursor>;

export type TwitterCursorInstance = ModelInstance<TwitterCursorAttributes>;

export default (
sequelize: Sequelize.Sequelize,
): Sequelize.ModelStatic<TwitterCursorInstance> =>
sequelize.define<TwitterCursorInstance>(
'TwitterCursor',
{
bot_name: {
type: Sequelize.STRING,
primaryKey: true,
},
last_polled_timestamp: {
type: Sequelize.BIGINT,
allowNull: false,
},
},
{
timestamps: false,
tableName: 'TwitterCursors',
},
);
1 change: 1 addition & 0 deletions libs/schemas/src/entities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export * from './stake.schemas';
export * from './tag.schemas';
export * from './thread.schemas';
export * from './topic.schemas';
export * from './twitter-cursors.schemas';
export * from './user.schemas';
export * from './wallets.schemas';
export * from './webhook.schemas';
Expand Down
7 changes: 7 additions & 0 deletions libs/schemas/src/entities/twitter-cursors.schemas.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { TwitterBotName } from '@hicommonwealth/shared';
import { z } from 'zod';

export const TwitterCursor = z.object({
bot_name: z.nativeEnum(TwitterBotName),
last_polled_timestamp: z.number(),
});
4 changes: 4 additions & 0 deletions libs/schemas/src/events/events.schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { FarcasterAction } from '../entities/farcaster.schemas';
import { SubscriptionPreference } from '../entities/notification.schemas';
import { Reaction } from '../entities/reaction.schemas';
import { Thread } from '../entities/thread.schemas';
import { Tweet } from '../integrations';
import { PG_INT } from '../utils';
import {
CommunityStakeTrade,
Expand Down Expand Up @@ -378,4 +379,7 @@ export const events = {
end_date: z.coerce.date(),
community_id: z.string().nullish(),
}),

TwitterMomBotMentioned: Tweet,
TwitterContestBotMentioned: Tweet,
} as const;
1 change: 1 addition & 0 deletions libs/schemas/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export * from './commands';
export * from './context';
export * from './entities';
export * from './events';
export * from './integrations';
export * from './projections';
export * from './queries';
export * from './utils';
1 change: 1 addition & 0 deletions libs/schemas/src/integrations/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './twitter';
41 changes: 41 additions & 0 deletions libs/schemas/src/integrations/twitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { z } from 'zod';

export const Tweet = z.object({
id: z.string(),
author_id: z.string(),
username: z.string(),
created_at: z.string(),
text: z.string().describe('The first 280 characters of the tweet'),
note_tweet: z
.string()
.optional()
.describe('The full tweet text including anything above 280 characters'),
conversation_id: z.string().optional(),
reply_settings: z
.enum([
'everyone',
'mentionedUsers',
'following',
'other',
'subscribers',
'verified',
])
.optional(),
});

export const TwitterMentionsTimeline = z.object({
data: z.array(Tweet),
errors: z.array(
z.object({
title: z.string(),
type: z.string(),
detail: z.string().optional(),
status: z.number().optional(),
}),
),
meta: z
.object({
next_token: z.string().optional(),
})
.optional(),
});
9 changes: 8 additions & 1 deletion libs/shared/src/types/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ThresholdData } from './protocol';

export const Roles = ['admin', 'moderator', 'member'] as const;
export type Role = typeof Roles[number];
export type Role = (typeof Roles)[number];

export type AddressRole = {
address: string;
Expand Down Expand Up @@ -70,3 +70,10 @@ export type Link = {
identifier: string;
title?: string;
};

// These are meant to be static and Common focused
// i.e. these should not change even if the underlying Twitter account changes
export enum TwitterBotName {
MomBot = 'MomBot',
ContestBot = 'ContestBot',
}
1 change: 1 addition & 0 deletions packages/commonwealth/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"start-evm-ce": "tsx server/workers/evmChainEvents/startEvmPolling.ts",
"start-knock": "tsx server/workers/knock/knockWorker.ts",
"start-message-relayer": "tsx ./server/workers/messageRelayer/messageRelayer.ts",
"start-twitter": "tsx server/workers/twitterWorker/twitterWorker.ts",
"stylelint": "stylelint client/styles/*",
"test": "pnpm test-unit",
"test-api": "NODE_ENV=test vitest --config ../../vite.config.ts --coverage run ./test/integration/api",
Expand Down
15 changes: 15 additions & 0 deletions packages/commonwealth/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const {
DEV_MODULITH,
ENABLE_CLIENT_PUBLISHING,
EVM_CE_LOG_TRACE,
TWITTER_WORKER_POLL_INTERVAL,
} = process.env;

const DEFAULTS = {
Expand All @@ -38,6 +39,8 @@ const DEFAULTS = {
MESSAGE_RELAYER_TIMEOUT_MS: '200',
MESSAGE_RELAYER_PREFETCH: '50',
EVM_CE_POLL_INTERVAL: '120000',
// 16 minutes -> 15 minute rate limit window + 1 minute buffer to account for func execution time
TWITTER_WORKER_POLL_INTERVAL: 16 * 60 * 1000,
};

export const config = configure(
Expand Down Expand Up @@ -107,6 +110,15 @@ export const config = configure(
},
DEV_MODULITH: DEV_MODULITH === 'true',
ENABLE_CLIENT_PUBLISHING: ENABLE_CLIENT_PUBLISHING === 'true',
TWITTER: {
WORKER_POLL_INTERVAL: (() => {
if (TWITTER_WORKER_POLL_INTERVAL)
return parseInt(TWITTER_WORKER_POLL_INTERVAL, 10);
else if (model_config.APP_ENV === 'local')
return DEFAULTS.TWITTER_WORKER_POLL_INTERVAL;
else return 0;
})(),
},
},
z.object({
NO_GLOBAL_ACTIVITY_CACHE: z.boolean(),
Expand Down Expand Up @@ -191,5 +203,8 @@ export const config = configure(
}),
DEV_MODULITH: z.boolean(),
ENABLE_CLIENT_PUBLISHING: z.boolean(),
TWITTER: z.object({
WORKER_POLL_INTERVAL: z.number().int().gte(0),
}),
}),
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
'use strict';

/** @type {import('sequelize-cli').Migration} */
module.exports = {
async up(queryInterface, Sequelize) {
return queryInterface.createTable('TwitterCursors', {
bot_name: {
type: Sequelize.STRING,
primaryKey: true,
},
last_polled_timestamp: {
type: Sequelize.BIGINT,
allowNull: false,
},
});
},

async down(queryInterface, Sequelize) {
return queryInterface.dropTable('TwitterCursors');
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ async function main() {
isServiceHealthy = true;
bootstrapContestRolloverLoop();
} catch (error) {
isServiceHealthy = false;
log.fatal('Consumer setup failed', error);
}
}
Expand Down
94 changes: 94 additions & 0 deletions packages/commonwealth/server/workers/twitterWorker/pollTwitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { logger } from '@hicommonwealth/core';
import { Tweet, TwitterMentionsTimeline } from '@hicommonwealth/schemas';
import fetch from 'node-fetch';
import z from 'zod';
import { TwitterBotConfig } from './utils';

const log = logger(import.meta);

async function getFromTwitter({
twitterBotConfig,
url,
queryParams,
}: {
twitterBotConfig: TwitterBotConfig;
url: string;
queryParams: Record<string, string | Date | number>;
}): Promise<{ jsonBody: Record<string, unknown>; requestsRemaining: number }> {
const parsedQueryParams: Record<string, string> = Object.fromEntries(
Object.entries(queryParams).map(([key, value]) => [
key,
value instanceof Date ? value.toISOString() : value.toString(),
]),
);

const queryString = new URLSearchParams(parsedQueryParams).toString();
const fullUrl = `${url}?${queryString}`;

const response = await fetch(fullUrl, {
method: 'GET',
headers: {
Authorization: `Bearer ${twitterBotConfig.bearerToken}`,
'Content-Type': 'application/json',
},
});

if (!response.ok) {
throw new Error(
`Request failed with status ${response.status}: ${response.statusText}`,
);
}

return {
jsonBody: await response.json(),
requestsRemaining: Number(response.headers.get('x-rate-limit-remaining')),
};
}

// https://docs.x.com/x-api/posts/user-mention-timeline-by-user-id
export async function getMentions({
twitterBotConfig,
startTime,
endTime,
}: {
twitterBotConfig: TwitterBotConfig;
startTime: Date;
endTime: Date;
}): Promise<{ mentions: z.infer<typeof Tweet>[]; endTime: Date }> {
const allMentions: z.infer<typeof Tweet>[] = [];
let paginationToken: string | undefined;
let requestsRemaining: number;
do {
const res = await getFromTwitter({
twitterBotConfig,
url: `https://api.x.com/2/users/${twitterBotConfig.twitterUserId}/mentions`,
queryParams: {
start_time: startTime,
end_time: endTime,
},
});
const parsedRes = TwitterMentionsTimeline.parse(res.jsonBody);
paginationToken = parsedRes.meta?.next_token;
requestsRemaining = res.requestsRemaining;

for (const error of parsedRes.errors) {
log.error(
'Error occurred polling for Twitter mentions',
new Error(JSON.stringify(error)),
{
botName: twitterBotConfig.name,
},
);
}
allMentions.push(...parsedRes.data);
} while (paginationToken && requestsRemaining > 0);

if (paginationToken && requestsRemaining === 0 && allMentions.length > 0) {
return {
mentions: allMentions,
endTime: new Date(allMentions.at(-1)!.created_at),
};
}

return { mentions: allMentions, endTime };
}
Loading

0 comments on commit bcc9731

Please sign in to comment.