Skip to content

Commit

Permalink
Get channel info on demand
Browse files Browse the repository at this point in the history
  • Loading branch information
Inkvi committed Mar 12, 2024
1 parent eb31e5c commit 217fb8e
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 97 deletions.
185 changes: 88 additions & 97 deletions app/api/packets/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { NextRequest, NextResponse } from 'next/server';
import { Packet, PacketStates } from 'utils/types/packet';
import { CHAIN, CHAIN_CONFIGS } from 'utils/chains/configs';
import { CachingJsonRpcProvider } from 'api/utils/provider-cache';
import { getCacheTTL, SimpleCache } from 'api/utils/cosmos';
import { getCacheTTL, GetTmClient, SimpleCache } from 'api/utils/cosmos';
import { IdentifiedChannel } from 'cosmjs-types/ibc/core/channel/v1/channel';
import Abi from 'utils/dispatcher.json';

Expand All @@ -22,126 +22,101 @@ export async function GET(request: NextRequest) {
return NextResponse.json(allPackets);
}

const channelsResponse = await getChannelsConcurrently();
if (!channelsResponse) {
return NextResponse.error();
}
const channels = channelsResponse as Array<IdentifiedChannel>;
const openChannels = channels.filter((channel) => {
return (
channel.portId.startsWith(`polyibc.`) &&
channel.counterparty.portId.startsWith(`polyibc.`)
);
});
if (openChannels.length === 0) {
return NextResponse.json([]);
}

const validChannelIds = new Set<string>();
openChannels.forEach((channel) => {
validChannelIds.add(channel.channelId);
});

let sendLogs: Array<[ethers.EventLog, CHAIN]> = [];
let sendLogs: Array<[ethers.EventLog, CHAIN, string]> = [];
let srcChainProviders: Record<CHAIN, CachingJsonRpcProvider> = {} as Record<CHAIN, CachingJsonRpcProvider>;
let srcChainContracts: Array<[ethers.Contract, CHAIN, number]> = [];
let srcChainContracts: Array<[ethers.Contract, CHAIN, number, string]> = [];
for (const chain in CHAIN_CONFIGS) {
const chainId = chain as CHAIN;
const dispatcherAddresses = CHAIN_CONFIGS[chainId].dispatchers;
for (const dispatcherAddress of dispatcherAddresses) {
const clients = CHAIN_CONFIGS[chainId].clients;

for (let i=0; i < dispatcherAddresses.length; i++) {
let dispatcherAddress = dispatcherAddresses[i];
let client = clients[i];
const provider = new CachingJsonRpcProvider(CHAIN_CONFIGS[chainId].rpc, CHAIN_CONFIGS[chainId].id);
const block = await provider.getBlock('latest');
const blockNumber = block!.number;
const fromBlock = blockNumber - getLookbackTime() / CHAIN_CONFIGS[chainId].blockTime;

srcChainProviders[chainId] = provider;
const contract = new ethers.Contract(dispatcherAddress, Abi.abi, provider);
srcChainContracts.push([contract, chainId, fromBlock]);
srcChainContracts.push([contract, chainId, fromBlock, client]);

const newSendLogs = (await contract.queryFilter('SendPacket', fromBlock, 'latest')) as Array<ethers.EventLog>;
sendLogs = sendLogs.concat(newSendLogs.map((eventLog) => [eventLog, chainId]));
sendLogs = sendLogs.concat(newSendLogs.map((eventLog) => [eventLog, chainId, client]));
}
}
const tmClient = await GetTmClient();

// Collect all packets and their properties from the send logs
const unprocessedPacketKeys = new Set<string>();
const packets: Record<string, Packet> = {};

async function processSendLog(sendLog: [ethers.EventLog, CHAIN]) {
const [sendEvent, srcChain] = sendLog;
console.log(`Processing ${sendLogs.length} send logs...`)
async function processSendLog(sendLog: [ethers.EventLog, CHAIN, string]) {
const [sendEvent, srcChain, client] = sendLog;
let [srcPortAddress, srcChannelId, packet, sequence, timeout, fee] = sendEvent.args;
srcChannelId = ethers.decodeBytes32String(srcChannelId);
const portId = `polyibc.${client}.${srcPortAddress.slice(2)}`;

if (!validChannelIds.has(srcChannelId)) {
let channel;
try {
console.log(portId, srcChannelId)
channel = await tmClient.ibc.channel.channel(portId, srcChannelId);
} catch (e) {
console.log('Skipping packet for channel: ', srcChannelId);
return;
}

const channel = openChannels.find(channel => (
channel.channelId === srcChannelId &&
channel.portId.startsWith(`polyibc.${srcChain}`) &&
channel.portId.endsWith(srcPortAddress.slice(2))
));
if (!channel) {
if (!channel.channel) {
console.warn('No channel found for packet: ', srcChannelId, srcPortAddress);
return;
}

const key = `${srcPortAddress}-${srcChannelId}-${sequence}`;
const key = `${portId}-${srcChannelId}-${sequence}`;
console.log("send key", key)
const srcProvider = srcChainProviders[srcChain];
const srcBlock = await srcProvider.getBlock(sendEvent.blockNumber);

packets[key] = {
sourcePortAddress: srcPortAddress,
sourceChannelId: srcChannelId,
destPortAddress: channel.counterparty.portId,
destChannelId: channel.counterparty.channelId,
destPortAddress: channel.channel.counterparty.portId,
destChannelId: channel.channel.counterparty.channelId,
fee,
sequence: sequence.toString(),
timeout: timeout.toString(),
id: key,
state: PacketStates.SENT,
createTime: srcBlock!.timestamp,
sendTx: sendEvent.transactionHash,
sourceChain: channel.portId.split('.')[1],
destChain: channel.counterparty.portId.split('.')[1]
sourceChain: srcChain,
destChain: channel.channel.counterparty.portId.split('.')[1]
};
unprocessedPacketKeys.add(key);
}

await Promise.allSettled(sendLogs.map(processSendLog));

/*
** Find the state of each packet
** States could be like:
** SENT (event on L2), POLY_RECV (received by Polymer), POLY_SENT (committed on Polymer), RECV (event on L2),
** WRITE_ACK (event on L2), POLY_ACK_RECV (ack received on Polymer), POLY_WRITE_ACK (ack written on Polymer), ACK (event on L2)
**
** To set a proper state for each packet, start with SENT state for all relevant packets, then:
** For each packet go into the reverse direction of the packet flow starting from ACK
** If a packet reached the corresponding state, set it as the state for the packet and move on to the next packet
** Otherwise move to the next state until SENT state is reached
*/

// Start by searching for ack events on the source chains
const ackLogsPromises = srcChainContracts.map(async ([contract, chain, fromBlock]) => {
const ackLogsPromises = srcChainContracts.map(async ([contract, chain, fromBlock, client]) => {
const newAckLogs = (await contract.queryFilter('Acknowledgement', fromBlock, 'latest')) as Array<ethers.EventLog>;
return newAckLogs.map((eventLog) => [eventLog, chain] as [ethers.EventLog, CHAIN]);
return newAckLogs.map((eventLog) => [eventLog, chain, client] as [ethers.EventLog, CHAIN, string]);
});

const ackLogsResults = await Promise.allSettled(ackLogsPromises);
let ackLogs: Array<[ethers.EventLog, CHAIN]> = [];
let ackLogs: Array<[ethers.EventLog, CHAIN, string]> = [];

for (const result of ackLogsResults) {
if (result.status === 'fulfilled') {
ackLogs = ackLogs.concat(result.value);
}
}


async function processAckLog(ackLog: [ethers.EventLog, CHAIN]) {
const [ackEvent, srcChain] = ackLog;
async function processAckLog(ackLog: [ethers.EventLog, CHAIN, string]) {
const [ackEvent, srcChain, client] = ackLog;
let [srcPortAddress, srcChannelId, sequence] = ackEvent.args;
const key = `${srcPortAddress}-${ethers.decodeBytes32String(srcChannelId)}-${sequence}`;
const key = `polyibc.${client}.${srcPortAddress.slice(2)}-${ethers.decodeBytes32String(srcChannelId)}-${sequence}`;

if (!packets[key]) {
console.log('No packet found for ack: ', key);
Expand All @@ -165,52 +140,61 @@ export async function GET(request: NextRequest) {

// Set up providers and contracts to interact with the destination chains
let destChainProviders: Record<CHAIN, CachingJsonRpcProvider> = {} as Record<CHAIN, CachingJsonRpcProvider>;
let destChainContracts: Array<[ethers.Contract, CHAIN, number]> = [];
let destChainContracts: Array<[ethers.Contract, CHAIN, number, string]> = [];
for (const chain in CHAIN_CONFIGS) {
const chainId = chain as CHAIN;
const dispatcherAddresses = CHAIN_CONFIGS[chainId].dispatchers;
for (const dispatcherAddress of dispatcherAddresses) {
const clients = CHAIN_CONFIGS[chainId].clients;

for (let i=0; i < dispatcherAddresses.length; i++) {
let dispatcherAddress = dispatcherAddresses[i];
let client = clients[i];
const provider = new CachingJsonRpcProvider(CHAIN_CONFIGS[chainId].rpc, CHAIN_CONFIGS[chainId].id);
const block = await provider.getBlock('latest');
const blockNumber = block!.number;
const fromBlock = blockNumber - getLookbackTime() / CHAIN_CONFIGS[chainId].blockTime;

destChainProviders[chainId] = provider;
const contract = new ethers.Contract(dispatcherAddress, Abi.abi, provider);
destChainContracts.push([contract, chainId, fromBlock]);
destChainContracts.push([contract, chainId, fromBlock, client]);
}
}

const writeAckLogsPromises = destChainContracts.map(async ([destContract, destChain, fromBlock]) => {
const writeAckLogsPromises = destChainContracts.map(async ([destContract, destChain, fromBlock, client]) => {
const newWriteAckLogs = await destContract.queryFilter('WriteAckPacket', fromBlock, 'latest');
return newWriteAckLogs.map((eventLog) => [eventLog, destChain] as [ethers.EventLog, CHAIN]);
return newWriteAckLogs.map((eventLog) => [eventLog, destChain, client] as [ethers.EventLog, CHAIN, string]);
});

let writeAckLogs: Array<[ethers.EventLog, CHAIN]> = [];
let writeAckLogs: Array<[ethers.EventLog, CHAIN, string]> = [];
for (const result of await Promise.allSettled(writeAckLogsPromises)) {
if (result.status === 'fulfilled') {
writeAckLogs = writeAckLogs.concat(result.value);
}
}

console.log("Processing write ack logs...")

for (const writeAckLog of writeAckLogs) {
const [writeAckEvent, destChain] = writeAckLog;
const [receiver, destChannelId, sequence, ack] = writeAckEvent.args;

const channel = openChannels.find((channel) => {
return (
channel.counterparty.channelId === ethers.decodeBytes32String(destChannelId) &&
channel.counterparty.portId.startsWith(`polyibc.${destChain}`) &&
channel.counterparty.portId.endsWith(receiver.slice(2))
);
});

if (!channel) {
console.log('Unable to find channel for write ack: ', destChannelId, 'receiver: ', receiver);
const [writeAckEvent, destChain, client] = writeAckLog;
let [receiver, destChannelId, sequence, ack] = writeAckEvent.args;
destChannelId = ethers.decodeBytes32String(destChannelId);

let channel
try {
console.log(`polyibc.${client}.${receiver.slice(2)}`, destChannelId)
channel = await tmClient.ibc.channel.channel(`polyibc.${client}.${receiver.slice(2)}`, destChannelId);
} catch (e) {
console.log('Skipping packet for channel: ', destChannelId);
continue;
}

if (!channel.channel) {
console.warn('No channel found for write ack: ', destChannelId, receiver);
continue;
}

const key = `${channel.portId}-${channel.channelId}-${sequence}`;
const key = `${channel.channel.counterparty.portId}-${channel.channel.counterparty.channelId}-${sequence}`;
console.log("write ack key", key)
if (key in unprocessedPacketKeys) {
packets[key].state = PacketStates.WRITE_ACK;
unprocessedPacketKeys.delete(key);
Expand All @@ -219,36 +203,41 @@ export async function GET(request: NextRequest) {

// Match any recv packet events on destination chains to packets
const recvPacketPromises = destChainContracts.map(async (destChainContract) => {
const [destContract, destChain, fromBlock] = destChainContract;
const [destContract, destChain, fromBlock, client] = destChainContract;
const newRecvPacketLogs = await destContract.queryFilter('RecvPacket', fromBlock, 'latest');
return newRecvPacketLogs.map((eventLog) => [eventLog, destChain] as [ethers.EventLog, CHAIN]);
return newRecvPacketLogs.map((eventLog) => [eventLog, destChain, client] as [ethers.EventLog, CHAIN, string]);
});

let recvPacketLogs: Array<[ethers.EventLog, CHAIN]> = [];
let recvPacketLogs: Array<[ethers.EventLog, CHAIN, string]> = [];
for (const result of await Promise.allSettled(recvPacketPromises)) {
if (result.status === 'fulfilled') {
recvPacketLogs = recvPacketLogs.concat(result.value);
}
}

console.log(`Processing ${recvPacketLogs.length} recv packet logs...`)
const promises = recvPacketLogs.map(async (recvPacketLog) => {
const [recvPacketEvent, destChain] = recvPacketLog;
const [destPortAddress, destChannelId, sequence] = recvPacketEvent.args;

const channel = openChannels.find((channel) => {
return (
channel.counterparty.channelId === ethers.decodeBytes32String(destChannelId) &&
channel.counterparty.portId.startsWith(`polyibc.${destChain}`) &&
channel.counterparty.portId.endsWith(destPortAddress.slice(2))
);
});

if (!channel) {
console.log("Unable to find channel for recv packet: ", destChannelId, "receiver: ", destPortAddress);
return;
const [recvPacketEvent, destChain, client] = recvPacketLog;
let [destPortAddress, destChannelId, sequence] = recvPacketEvent.args;
destChannelId = ethers.decodeBytes32String(destChannelId);

let channel;
try {
console.log(`polyibc.${client}.${destPortAddress.slice(2)}`, destChannelId)
channel = await tmClient.ibc.channel.channel(`polyibc.${client}.${destPortAddress.slice(2)}`, destChannelId);
} catch (e) {
console.log('Skipping packet for channel: ', destChannelId);
return
}

if (!channel.channel) {
console.warn('No channel found for write ack: ', destChannelId, destPortAddress);
return
}

const key = `0x${channel.portId.split(".")[2]}-${channel.channelId}-${sequence}`;
const key = `${channel.channel.counterparty.portId}-${channel.channel.counterparty.channelId}-${sequence}`;
console.log("recv key", key)

if (packets[key]) {
const recvBlock = await destChainProviders[destChain].getBlock(recvPacketEvent.blockNumber);

Expand All @@ -262,6 +251,8 @@ export async function GET(request: NextRequest) {
}

packets[key].rcvTx = recvPacketEvent.transactionHash;
} else {
console.log('No packet found for recv: ', key);
}
});

Expand Down
8 changes: 8 additions & 0 deletions app/utils/chains/configs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ let baseDispatcher = process.env.DISPATCHER_ADDRESS_BASE!;
let opDispatcherSimClient = process.env.DISPATCHER_ADDRESS_OPTIMISM_SIMCLIENT!;
let baseDispatcherSimClient = process.env.DISPATCHER_ADDRESS_BASE_SIMCLIENT!;


let opClientName = process.env.OPTIMISM_CLIENT_NAME!;
let baseClientName = process.env.BASE_CLIENT_NAME!;
let opClientSimClientName = process.env.OPTIMISM_CLIENT_SIMCLIENT_NAME!;
let baseClientSimClientName = process.env.BASE_CLIENT_SIMCLIENT_NAME!;

let optimismRPC =
process.env.OPTIMISM_RPC ||
'https://opt-sepolia.g.alchemy.com/v2/iMhzwCPw18v9Byeh59cedtUKbul_jFF3'; // "https://opt-sepolia.g.alchemy.com/v2/jKvLhhXvtnWdNeZrKst0demxnwJcYH1o"
Expand All @@ -23,6 +29,7 @@ export const CHAIN_CONFIGS: {
display: 'Optimism',
rpc: optimismRPC,
dispatchers: [opDispatcher, opDispatcherSimClient],
clients: [opClientName, opClientSimClientName],
blockTime: 2,
icon: OptimismIcon,
},
Expand All @@ -31,6 +38,7 @@ export const CHAIN_CONFIGS: {
display: 'Base',
rpc: baseRPC,
dispatchers: [baseDispatcher, baseDispatcherSimClient],
clients: [baseClientName, baseClientSimClientName],
blockTime: 2,
icon: BaseIcon,
},
Expand Down
1 change: 1 addition & 0 deletions app/utils/types/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export interface Chain {
display: string;
rpc: string;
dispatchers: string[];
clients: string[];
blockTime: number;
icon: () => JSX.Element;
}

0 comments on commit 217fb8e

Please sign in to comment.