diff --git a/src/parser/index.ts b/src/parser/index.ts index d7b83664..87b010d2 100644 --- a/src/parser/index.ts +++ b/src/parser/index.ts @@ -265,7 +265,7 @@ export default class Parser { static parseC(data: any) { if (data.timedContinuationData) - return new TimedContinuation(data.timedContinuationData); + return new Continuation({ continuation: data.timedContinuationData, type: 'timed' }); } static parseLC(data: any) { @@ -489,16 +489,20 @@ export class PlaylistPanelContinuation extends YTNode { } } -export class TimedContinuation extends YTNode { - static readonly type = 'timedContinuationData'; +export class Continuation extends YTNode { + static readonly type = 'continuation'; - timeout_ms: number; + continuation_type: string; + timeout_ms?: number; + time_until_last_message_ms?: number; token: string; constructor(data: any) { super(); - this.timeout_ms = data.timeoutMs || data.timeUntilLastMessageMsec; - this.token = data.continuation; + this.continuation_type = data.type; + this.timeout_ms = data.continuation?.timeoutMs; + this.time_until_last_message_ms = data.continuation?.timeUntilLastMessageMsec; + this.token = data.continuation?.continuation; } } @@ -517,7 +521,7 @@ export class LiveChatContinuation extends YTNode { search_terms: string[]; image: Thumbnail[]; }[]; - continuation: TimedContinuation; + continuation: Continuation; viewer_name: string; constructor(data: any) { @@ -541,11 +545,20 @@ export class LiveChatContinuation extends YTNode { is_custom_emoji: emoji.isCustomEmoji })) || []; - this.continuation = new TimedContinuation( - data.continuations?.[0].timedContinuationData || - data.continuations?.[0].invalidationContinuationData || - data.continuations?.[0].liveChatReplayContinuationData - ); + let continuation, type; + + if (data.continuations?.[0].timedContinuationData) { + type = 'timed'; + continuation = data.continuations?.[0].timedContinuationData; + } else if (data.continuations?.[0].invalidationContinuationData) { + type = 'invalidation'; + continuation = data.continuations?.[0].invalidationContinuationData; + } else if (data.continuations?.[0].liveChatReplayContinuationData) { + type = 'replay'; + continuation = data.continuations?.[0].liveChatReplayContinuationData; + } + + this.continuation = new Continuation({ continuation, type }); this.viewer_name = data.viewerName; } diff --git a/src/parser/youtube/LiveChat.ts b/src/parser/youtube/LiveChat.ts index 99a7e647..e795af3e 100644 --- a/src/parser/youtube/LiveChat.ts +++ b/src/parser/youtube/LiveChat.ts @@ -1,6 +1,7 @@ import EventEmitter from '../../utils/EventEmitterLike'; import Parser, { LiveChatContinuation, ParsedResponse } from '../index'; import VideoInfo from './VideoInfo'; +import SmoothedQueue from './SmoothedQueue'; import AddChatItemAction from '../classes/livechat/AddChatItemAction'; import AddLiveChatTickerItemAction from '../classes/livechat/AddLiveChatTickerItemAction'; @@ -51,11 +52,14 @@ export interface LiveMetadata { } class LiveChat extends EventEmitter { + smoothed_queue: SmoothedQueue; + #actions: Actions; #video_id: string; #channel_id: string; #continuation?: string; #mcontinuation?: string; + #retry_count = 0; initial_info?: LiveChatContinuation; metadata?: LiveMetadata; @@ -71,6 +75,31 @@ class LiveChat extends EventEmitter { this.#actions = video_info.actions; this.#continuation = video_info.livechat?.continuation; this.is_replay = video_info.livechat?.is_replay || false; + this.smoothed_queue = new SmoothedQueue(); + + this.smoothed_queue.callback = async (actions: YTNode[]) => { + if (!actions.length) { + // Wait 2 seconds before requesting an incremental continuation if the action group is empty. + await this.#wait(2000); + } else if (actions.length < 10) { + // If there are less than 10 actions, wait until all of them are emitted. + await this.#emitSmoothedActions(actions); + } else if (this.is_replay) { + /** + * NOTE: Live chat replays require data from the video player for actions to be emitted timely + * and as we don't have that, this ends up being quite innacurate. + */ + this.#emitSmoothedActions(actions); + await this.#wait(2000); + } else { + // There are more than 10 actions, emit them asynchonously so we can request the next incremental continuation. + this.#emitSmoothedActions(actions); + } + + if (this.running) { + this.#pollLivechat(); + } + }; } on(type: 'start', listener: (initial_data: LiveChatContinuation) => void): void; @@ -82,6 +111,15 @@ class LiveChat extends EventEmitter { super.on(type, listener); } + once(type: 'start', listener: (initial_data: LiveChatContinuation) => void): void; + once(type: 'chat-update', listener: (action: ChatAction) => void): void; + once(type: 'metadata-update', listener: (metadata: LiveMetadata) => void): void; + once(type: 'error', listener: (err: Error) => void): void; + once(type: 'end', listener: () => void): void; + once(type: string, listener: (...args: any[]) => void): void { + super.once(type, listener); + } + start() { if (!this.running) { this.running = true; @@ -91,17 +129,25 @@ class LiveChat extends EventEmitter { } stop() { + this.smoothed_queue.clear(); this.running = false; } #pollLivechat() { (async () => { try { - const endpoint = this.is_replay ? 'live_chat/get_live_chat_replay' : 'live_chat/get_live_chat'; - const response = await this.#actions.execute(endpoint, { continuation: this.#continuation }); + const response = await this.#actions.execute( + this.is_replay ? 'live_chat/get_live_chat_replay' : 'live_chat/get_live_chat', + { continuation: this.#continuation, parse: true } + ); - const data = Parser.parseResponse(response.data); - const contents = data.continuation_contents; + const contents = response.continuation_contents; + + if (!contents) { + this.emit('error', new InnertubeError('Unexpected live chat incremental continuation response', response)); + this.emit('end'); + this.stop(); + } if (!(contents instanceof LiveChatContinuation)) { this.stop(); @@ -115,31 +161,31 @@ class LiveChat extends EventEmitter { if (contents.header) { this.initial_info = contents; this.emit('start', contents); + if (this.running) + this.#pollLivechat(); } else { - await this.#emitSmoothedActions(contents.actions); + this.smoothed_queue.enqueueActionGroup(contents.actions); } - /** - * If there are no actions then we wait 1000 milliseconds, otherwise - * the amount of items on the action queue will determine the polling interval. - */ - if (!contents.actions.length && !contents.header) - await this.#wait(1000); - - if (this.running) - this.#pollLivechat(); + this.#retry_count = 0; } catch (err) { - this.emit('error', new InnertubeError('Failed to poll livechat, retrying...', err)); - await this.#wait(2000); - if (this.running) + this.emit('error', err); + + if (this.#retry_count++ < 10) { + await this.#wait(2000); this.#pollLivechat(); + } else { + this.emit('error', new InnertubeError('Reached retry limit for incremental continuation requests', err)); + this.emit('end'); + this.stop(); + } } })(); } /** * Ensures actions are emitted at the right speed. - * This was adapted from YouTube's compiled code (Android & Web). + * This and {@link SmoothedQueue} were based off of YouTube's own implementation. */ async #emitSmoothedActions(action_queue: YTNode[]) { const base = 1E4; @@ -192,7 +238,6 @@ class LiveChat extends EventEmitter { if (this.running) this.#pollMetadata(); } catch (err) { - this.emit('error', new InnertubeError('Failed to poll live metadata, retrying...', err)); await this.#wait(2000); if (this.running) this.#pollMetadata(); @@ -209,11 +254,12 @@ class LiveChat extends EventEmitter { params: Proto.encodeMessageParams(this.#channel_id, this.#video_id), richMessage: { textSegments: [ { text } ] }, clientMessageId: uuidv4(), + client: 'ANDROID', parse: true }); if (!response.actions) - throw new InnertubeError('Response did not have an "actions" property. The call may have failed.'); + throw new InnertubeError('Unexpected response from send_message', response); return response.actions.array().as(AddChatItemAction); } diff --git a/src/parser/youtube/SmoothedQueue.ts b/src/parser/youtube/SmoothedQueue.ts new file mode 100644 index 00000000..c0ed483d --- /dev/null +++ b/src/parser/youtube/SmoothedQueue.ts @@ -0,0 +1,159 @@ +import { YTNode } from '../helpers'; + +/** + * Flattens the given queue. + * @param queue - The queue to flatten. + */ +function flattenQueue(queue: YTNode[][]) { + const nodes: YTNode[] = []; + + for (const group of queue) { + if (Array.isArray(group)) { + for (const node of group) { + nodes.push(node); + } + } else { + nodes.push(group); + } + } + + return nodes; +} + +class DelayQueue { + front: number[]; + back: number[]; + + constructor() { + this.front = []; + this.back = []; + } + + public isEmpty(): boolean { + return !this.front.length && !this.back.length; + } + + public clear(): void { + this.front = []; + this.back = []; + } + + public getValues(): number[] { + return this.front.concat(this.back.reverse()); + } +} + +class SmoothedQueue { + #last_update_time: number | null; + #estimated_update_interval: number | null; + #callback: Function | null; + #action_queue: YTNode[][]; + #next_update_id: any; + #poll_response_delay_queue: DelayQueue; + + constructor() { + this.#last_update_time = null; + this.#estimated_update_interval = null; + this.#callback = null; + this.#action_queue = []; + this.#next_update_id = null; + this.#poll_response_delay_queue = new DelayQueue(); + } + + public enqueueActionGroup(group: YTNode[]): void { + if (this.#last_update_time !== null) { + const delay = Date.now() - this.#last_update_time; + + this.#poll_response_delay_queue.back.push(delay); + + if (5 < (this.#poll_response_delay_queue.front.length + this.#poll_response_delay_queue.back.length)) { + if (!this.#poll_response_delay_queue.front.length) { + this.#poll_response_delay_queue.front = this.#poll_response_delay_queue.back; + this.#poll_response_delay_queue.front.reverse(); + this.#poll_response_delay_queue.back = []; + } + + this.#poll_response_delay_queue.front.pop(); + } + + this.#estimated_update_interval = Math.max(...this.#poll_response_delay_queue.getValues()); + } + + this.#last_update_time = Date.now(); + + this.#action_queue.push(group); + + if (this.#next_update_id === null) { + this.#next_update_id = setTimeout(this.emitSmoothedActions.bind(this)); + } + } + + public emitSmoothedActions(): void { + this.#next_update_id = null; + + if (this.#action_queue.length) { + let delay = 1E4; + + if (this.#estimated_update_interval !== null && this.#last_update_time !== null) { + delay = this.#estimated_update_interval - Date.now() + this.#last_update_time; + } + + delay = this.#action_queue.length < delay / 80 ? 1 : Math.ceil(this.#action_queue.length / (delay / 80)); + + const actions = flattenQueue(this.#action_queue.splice(0, delay)); + + if (this.#callback) { + this.#callback(actions); + } + + if (this.#action_queue !== null) { + delay == 1 ? ( + delay = this.#estimated_update_interval as number / this.#action_queue.length, + delay *= Math.random() + 0.5, + delay = Math.min(1E3, delay), + delay = Math.max(80, delay) + ) : delay = 80; + + this.#next_update_id = setTimeout(this.emitSmoothedActions.bind(this), delay); + } + } + } + + public clear() { + if (this.#next_update_id !== null) { + clearTimeout(this.#next_update_id); + this.#next_update_id = null; + } + this.#action_queue = []; + } + + set callback(cb: Function | null) { + this.#callback = cb; + } + + get callback(): Function | null { + return this.#callback; + } + + get action_queue(): YTNode[][] { + return this.#action_queue; + } + + get estimated_update_interval(): number | null { + return this.#estimated_update_interval; + } + + get last_update_time(): number | null { + return this.#last_update_time; + } + + get next_update_id(): any { + return this.#next_update_id; + } + + get poll_response_delay_queue(): DelayQueue { + return this.#poll_response_delay_queue; + } +} + +export default SmoothedQueue; \ No newline at end of file