Skip to content

Commit

Permalink
Upgrade @distube/ytdl-core to version 4.15.6 and introduce caching to…
Browse files Browse the repository at this point in the history
… the stream verification service
  • Loading branch information
nukeop committed Jan 9, 2025
1 parent 8489271 commit 40434d7
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 29 deletions.
17 changes: 9 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions packages/app/app/actions/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { RootState } from '../reducers';
import { LocalLibraryState } from './local';
import { Queue } from './actionTypes';
import StreamProviderPlugin from '@nuclear/core/src/plugins/streamProvider';
import { isTopStream, TopStream } from '@nuclear/core/src/rest/Nuclear/StreamMappings';
import { isSuccessCacheEntry } from '@nuclear/core/src/rest/Nuclear/StreamMappings';
import { queue as queueSelector } from '../selectors/queue';

type LocalTrack = Track & {
Expand Down Expand Up @@ -184,18 +184,18 @@ export const findStreamsForTrack = (index: number) => async (dispatch, getState)

if (settings.useStreamVerification) {
try {
const StreamMappingsService = new rest.NuclearStreamMappingsService(process.env.NUCLEAR_VERIFICATION_SERVICE_URL);
const StreamMappingsService = rest.NuclearStreamMappingsService.get(process.env.NUCLEAR_VERIFICATION_SERVICE_URL);
const topStream = await StreamMappingsService.getTopStream(
track.artist,
track.name,
selectedStreamProvider.sourceName,
settings?.userId
);
// Use the top stream ID and put it at the top of the list
if (isTopStream(topStream.body)) {
if (isSuccessCacheEntry(topStream)) {
streamData = [
streamData.find(stream => stream.id === (topStream.body as TopStream).stream_id),
...streamData.filter(stream => stream.id !== (topStream.body as TopStream).stream_id)
streamData.find(stream => stream.id === topStream.value.stream_id),
...streamData.filter(stream => stream.id !== topStream.value.stream_id)
];
}
} catch (e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { AnyProps } from '../../../test/testUtils';
import { StreamVerificationContainer } from '.';
import { buildStoreState } from '../../../test/storeBuilders';
import { configureMockStore, TestStoreProvider, setupI18Next } from '../../../test/testUtils';
import { NuclearStreamMappingsService } from '@nuclear/core/src/rest';

describe('StreamVerificationContainer', () => {
beforeAll(() => {
Expand All @@ -15,6 +16,7 @@ describe('StreamVerificationContainer', () => {

beforeEach(() => {
fetchMock.reset();
NuclearStreamMappingsService.get(process.env.NUCLEAR_VERIFICATION_SERVICE_URL).clearTopStreamCache();
});

it('renders nothing when nothing\'s playing', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { head } from 'lodash';
import { pluginsSelectors } from '../../selectors/plugins';
import { settingsSelector } from '../../selectors/settings';
import { setStringOption } from '../../actions/settings';
import { isResponseBody } from '@nuclear/core/src/rest/Nuclear/NuclearService';
import { isSuccessCacheEntry } from '@nuclear/core/src/rest/Nuclear/StreamMappings';

const WEAK_VERIFICATION_THRESHOLD = 3;

Expand All @@ -27,7 +27,7 @@ export const StreamVerificationContainer: React.FC = () => {
const currentTrack: QueueItem = queue.queueItems[queue.currentSong];
const [isLoading, setLoading] = useState(false);
const [verificationStatus, setVerificationStatus] = useState<StreamVerificationProps['status']>('unknown');
const StreamMappingsService = new rest.NuclearStreamMappingsService(process.env.NUCLEAR_VERIFICATION_SERVICE_URL);
const StreamMappingsService = rest.NuclearStreamMappingsService.get(process.env.NUCLEAR_VERIFICATION_SERVICE_URL);

useEffect(() => {
setVerificationStatus('unknown');
Expand All @@ -37,14 +37,14 @@ export const StreamVerificationContainer: React.FC = () => {
currentTrack.name,
selectedStreamProvider,
settings?.userId
).then(res => {
if (isResponseBody(res) && res.body.stream_id === head(currentTrack.streams)?.id) {
if (res.body.score === undefined) {
).then(topStream => {
if (isSuccessCacheEntry(topStream) && topStream.value.stream_id === head(currentTrack.streams)?.id) {
if (topStream.value.score === undefined) {
logger.error(`Failed to verify stream: ${currentTrack.name} by ${getTrackArtist(currentTrack)}`);
setVerificationStatus('unverified');
} else if (res.body.self_verified) {
} else if (topStream.value.self_verified) {
setVerificationStatus('verified_by_user');
} else if (res.body.score < WEAK_VERIFICATION_THRESHOLD) {
} else if (topStream.value.score < WEAK_VERIFICATION_THRESHOLD) {
setVerificationStatus('weakly_verified');
} else {
setVerificationStatus('verified');
Expand Down
4 changes: 2 additions & 2 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"@babel/core": "^7.9.0",
"@babel/preset-env": "^7.9.5",
"@babel/preset-react": "^7.9.4",
"@distube/ytdl-core": "4.15.8",
"@distube/ytdl-core": "4.15.6",
"@nuclear/scanner": "^0.6.40",
"@supabase/supabase-js": "^1.35.4",
"ajv": "^6.12.5",
Expand Down Expand Up @@ -63,4 +63,4 @@
"ts-node": "^10.7.0",
"typescript": "^4.2.4"
}
}
}
89 changes: 82 additions & 7 deletions packages/core/src/rest/Nuclear/StreamMappings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,111 @@ type ErrorBody = {
error: string;
}

export const isTopStream = (data: TopStream | ErrorBody): data is TopStream => {
return has(data, 'stream_id');

type SuccessCacheEntry = {
type: 'success';
value: TopStream;
timestamp: number;
}

type ErrorCacheEntry = {
type: 'error';
error: ErrorBody;
timestamp: number;
}

type StreamCacheEntry = SuccessCacheEntry | ErrorCacheEntry;

export const isSuccessCacheEntry = (data: StreamCacheEntry): data is SuccessCacheEntry => {
return data.type === 'success';
};

export const isErrorCacheEntry = (data: StreamCacheEntry): data is ErrorCacheEntry => {
return data.type === 'error';
};

export class NuclearStreamMappingsService extends NuclearService {
constructor(baseUrl: string) {
private static instance: NuclearStreamMappingsService;
private topStreamCache = new Map<string, StreamCacheEntry>();
private readonly CACHE_TTL = 5 * 60 * 1000; // 5 minutes in milliseconds

private constructor(baseUrl: string) {
super(baseUrl);
}

async getTopStream(artist: string, title: string, source: string, author_id: string){
return this.getJson<TopStream, ErrorBody>(fetch(`${this.baseUrl}/stream-mappings/top-stream`, {
static get(baseUrl: string): NuclearStreamMappingsService {
if (!NuclearStreamMappingsService.instance) {
NuclearStreamMappingsService.instance = new NuclearStreamMappingsService(baseUrl);
}

return NuclearStreamMappingsService.instance;
}

private createCacheKey(artist: string, title: string, source: string): string {
return `${artist}:${title}:${source}`;
}

private isValidCacheEntry(entry: StreamCacheEntry): boolean {
return Date.now() - entry.timestamp < this.CACHE_TTL;
}

clearTopStreamCache(): void {
this.topStreamCache.clear();
}

invalidateTopStreamCache(artist: string, title: string, source: string, author_id: string): void {
const cacheKey = this.createCacheKey(artist, title, source);
this.topStreamCache.delete(cacheKey);
}

async getTopStream(artist: string, title: string, source: string, author_id: string) {
const cacheKey = this.createCacheKey(artist, title, source);
const cachedEntry = this.topStreamCache.get(cacheKey);

if (cachedEntry && this.isValidCacheEntry(cachedEntry)) {
return cachedEntry;
}

if (cachedEntry) {
this.topStreamCache.delete(cacheKey);
}

const result = await this.getJson<TopStream, ErrorBody>(fetch(`${this.baseUrl}/stream-mappings/top-stream`, {
headers: this.getHeaders(),
method: 'POST',
body: JSON.stringify({ artist, title, source, author_id })
}));

if (result.ok) {
const cacheEntry: SuccessCacheEntry = { type: 'success', value: result.body as TopStream, timestamp: Date.now() };
this.topStreamCache.set(this.createCacheKey(artist, title, source), cacheEntry);
return cacheEntry;
} else {
const cacheEntry: ErrorCacheEntry = { type: 'error', error: result.body as ErrorBody, timestamp: Date.now() };
this.topStreamCache.set(this.createCacheKey(artist, title, source), cacheEntry);
return cacheEntry;
}
}

async postStreamMapping(mapping: PostStreamMappingPayload) {
return this.getJson<void, ErrorBody>(fetch(`${this.baseUrl}/stream-mappings/verify`, {
const result = await this.getJson<void, ErrorBody>(fetch(`${this.baseUrl}/stream-mappings/verify`, {
headers: this.getHeaders(),
method: 'POST',
body: JSON.stringify(mapping)
}));

this.invalidateTopStreamCache(mapping.artist, mapping.title, mapping.source, mapping.author_id);
return result;
}

async deleteStreamMapping(mapping: DeleteStreamMappingPayload) {
return this.getJson<void, ErrorBody>(fetch(`${this.baseUrl}/stream-mappings/unverify`, {
const result = await this.getJson<void, ErrorBody>(fetch(`${this.baseUrl}/stream-mappings/unverify`, {
headers: this.getHeaders(),
method: 'DELETE',
body: JSON.stringify(mapping)
}));

this.invalidateTopStreamCache(mapping.artist, mapping.title, mapping.source, mapping.author_id);
return result;
}
}

0 comments on commit 40434d7

Please sign in to comment.