mirror of
https://github.com/LuanRT/YouTube.js.git
synced 2026-06-19 04:21:35 +00:00
refactor: improve live chat polling and error handling (#287)
This commit is contained in:
@@ -265,7 +265,7 @@ export default class Parser {
|
||||
|
||||
static parseC(data: any) {
|
||||
if (data.timedContinuationData)
|
||||
return new TimedContinuation(data.timedContinuationData);
|
||||
return new Continuation({ continuation: data.timedContinuationData, type: 'timed' });
|
||||
}
|
||||
|
||||
static parseLC(data: any) {
|
||||
@@ -489,16 +489,20 @@ export class PlaylistPanelContinuation extends YTNode {
|
||||
}
|
||||
}
|
||||
|
||||
export class TimedContinuation extends YTNode {
|
||||
static readonly type = 'timedContinuationData';
|
||||
export class Continuation extends YTNode {
|
||||
static readonly type = 'continuation';
|
||||
|
||||
timeout_ms: number;
|
||||
continuation_type: string;
|
||||
timeout_ms?: number;
|
||||
time_until_last_message_ms?: number;
|
||||
token: string;
|
||||
|
||||
constructor(data: any) {
|
||||
super();
|
||||
this.timeout_ms = data.timeoutMs || data.timeUntilLastMessageMsec;
|
||||
this.token = data.continuation;
|
||||
this.continuation_type = data.type;
|
||||
this.timeout_ms = data.continuation?.timeoutMs;
|
||||
this.time_until_last_message_ms = data.continuation?.timeUntilLastMessageMsec;
|
||||
this.token = data.continuation?.continuation;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -517,7 +521,7 @@ export class LiveChatContinuation extends YTNode {
|
||||
search_terms: string[];
|
||||
image: Thumbnail[];
|
||||
}[];
|
||||
continuation: TimedContinuation;
|
||||
continuation: Continuation;
|
||||
viewer_name: string;
|
||||
|
||||
constructor(data: any) {
|
||||
@@ -541,11 +545,20 @@ export class LiveChatContinuation extends YTNode {
|
||||
is_custom_emoji: emoji.isCustomEmoji
|
||||
})) || [];
|
||||
|
||||
this.continuation = new TimedContinuation(
|
||||
data.continuations?.[0].timedContinuationData ||
|
||||
data.continuations?.[0].invalidationContinuationData ||
|
||||
data.continuations?.[0].liveChatReplayContinuationData
|
||||
);
|
||||
let continuation, type;
|
||||
|
||||
if (data.continuations?.[0].timedContinuationData) {
|
||||
type = 'timed';
|
||||
continuation = data.continuations?.[0].timedContinuationData;
|
||||
} else if (data.continuations?.[0].invalidationContinuationData) {
|
||||
type = 'invalidation';
|
||||
continuation = data.continuations?.[0].invalidationContinuationData;
|
||||
} else if (data.continuations?.[0].liveChatReplayContinuationData) {
|
||||
type = 'replay';
|
||||
continuation = data.continuations?.[0].liveChatReplayContinuationData;
|
||||
}
|
||||
|
||||
this.continuation = new Continuation({ continuation, type });
|
||||
|
||||
this.viewer_name = data.viewerName;
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import EventEmitter from '../../utils/EventEmitterLike';
|
||||
import Parser, { LiveChatContinuation, ParsedResponse } from '../index';
|
||||
import VideoInfo from './VideoInfo';
|
||||
import SmoothedQueue from './SmoothedQueue';
|
||||
|
||||
import AddChatItemAction from '../classes/livechat/AddChatItemAction';
|
||||
import AddLiveChatTickerItemAction from '../classes/livechat/AddLiveChatTickerItemAction';
|
||||
@@ -51,11 +52,14 @@ export interface LiveMetadata {
|
||||
}
|
||||
|
||||
class LiveChat extends EventEmitter {
|
||||
smoothed_queue: SmoothedQueue;
|
||||
|
||||
#actions: Actions;
|
||||
#video_id: string;
|
||||
#channel_id: string;
|
||||
#continuation?: string;
|
||||
#mcontinuation?: string;
|
||||
#retry_count = 0;
|
||||
|
||||
initial_info?: LiveChatContinuation;
|
||||
metadata?: LiveMetadata;
|
||||
@@ -71,6 +75,31 @@ class LiveChat extends EventEmitter {
|
||||
this.#actions = video_info.actions;
|
||||
this.#continuation = video_info.livechat?.continuation;
|
||||
this.is_replay = video_info.livechat?.is_replay || false;
|
||||
this.smoothed_queue = new SmoothedQueue();
|
||||
|
||||
this.smoothed_queue.callback = async (actions: YTNode[]) => {
|
||||
if (!actions.length) {
|
||||
// Wait 2 seconds before requesting an incremental continuation if the action group is empty.
|
||||
await this.#wait(2000);
|
||||
} else if (actions.length < 10) {
|
||||
// If there are less than 10 actions, wait until all of them are emitted.
|
||||
await this.#emitSmoothedActions(actions);
|
||||
} else if (this.is_replay) {
|
||||
/**
|
||||
* NOTE: Live chat replays require data from the video player for actions to be emitted timely
|
||||
* and as we don't have that, this ends up being quite innacurate.
|
||||
*/
|
||||
this.#emitSmoothedActions(actions);
|
||||
await this.#wait(2000);
|
||||
} else {
|
||||
// There are more than 10 actions, emit them asynchonously so we can request the next incremental continuation.
|
||||
this.#emitSmoothedActions(actions);
|
||||
}
|
||||
|
||||
if (this.running) {
|
||||
this.#pollLivechat();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
on(type: 'start', listener: (initial_data: LiveChatContinuation) => void): void;
|
||||
@@ -82,6 +111,15 @@ class LiveChat extends EventEmitter {
|
||||
super.on(type, listener);
|
||||
}
|
||||
|
||||
once(type: 'start', listener: (initial_data: LiveChatContinuation) => void): void;
|
||||
once(type: 'chat-update', listener: (action: ChatAction) => void): void;
|
||||
once(type: 'metadata-update', listener: (metadata: LiveMetadata) => void): void;
|
||||
once(type: 'error', listener: (err: Error) => void): void;
|
||||
once(type: 'end', listener: () => void): void;
|
||||
once(type: string, listener: (...args: any[]) => void): void {
|
||||
super.once(type, listener);
|
||||
}
|
||||
|
||||
start() {
|
||||
if (!this.running) {
|
||||
this.running = true;
|
||||
@@ -91,17 +129,25 @@ class LiveChat extends EventEmitter {
|
||||
}
|
||||
|
||||
stop() {
|
||||
this.smoothed_queue.clear();
|
||||
this.running = false;
|
||||
}
|
||||
|
||||
#pollLivechat() {
|
||||
(async () => {
|
||||
try {
|
||||
const endpoint = this.is_replay ? 'live_chat/get_live_chat_replay' : 'live_chat/get_live_chat';
|
||||
const response = await this.#actions.execute(endpoint, { continuation: this.#continuation });
|
||||
const response = await this.#actions.execute(
|
||||
this.is_replay ? 'live_chat/get_live_chat_replay' : 'live_chat/get_live_chat',
|
||||
{ continuation: this.#continuation, parse: true }
|
||||
);
|
||||
|
||||
const data = Parser.parseResponse(response.data);
|
||||
const contents = data.continuation_contents;
|
||||
const contents = response.continuation_contents;
|
||||
|
||||
if (!contents) {
|
||||
this.emit('error', new InnertubeError('Unexpected live chat incremental continuation response', response));
|
||||
this.emit('end');
|
||||
this.stop();
|
||||
}
|
||||
|
||||
if (!(contents instanceof LiveChatContinuation)) {
|
||||
this.stop();
|
||||
@@ -115,31 +161,31 @@ class LiveChat extends EventEmitter {
|
||||
if (contents.header) {
|
||||
this.initial_info = contents;
|
||||
this.emit('start', contents);
|
||||
if (this.running)
|
||||
this.#pollLivechat();
|
||||
} else {
|
||||
await this.#emitSmoothedActions(contents.actions);
|
||||
this.smoothed_queue.enqueueActionGroup(contents.actions);
|
||||
}
|
||||
|
||||
/**
|
||||
* If there are no actions then we wait 1000 milliseconds, otherwise
|
||||
* the amount of items on the action queue will determine the polling interval.
|
||||
*/
|
||||
if (!contents.actions.length && !contents.header)
|
||||
await this.#wait(1000);
|
||||
|
||||
if (this.running)
|
||||
this.#pollLivechat();
|
||||
this.#retry_count = 0;
|
||||
} catch (err) {
|
||||
this.emit('error', new InnertubeError('Failed to poll livechat, retrying...', err));
|
||||
await this.#wait(2000);
|
||||
if (this.running)
|
||||
this.emit('error', err);
|
||||
|
||||
if (this.#retry_count++ < 10) {
|
||||
await this.#wait(2000);
|
||||
this.#pollLivechat();
|
||||
} else {
|
||||
this.emit('error', new InnertubeError('Reached retry limit for incremental continuation requests', err));
|
||||
this.emit('end');
|
||||
this.stop();
|
||||
}
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures actions are emitted at the right speed.
|
||||
* This was adapted from YouTube's compiled code (Android & Web).
|
||||
* This and {@link SmoothedQueue} were based off of YouTube's own implementation.
|
||||
*/
|
||||
async #emitSmoothedActions(action_queue: YTNode[]) {
|
||||
const base = 1E4;
|
||||
@@ -192,7 +238,6 @@ class LiveChat extends EventEmitter {
|
||||
if (this.running)
|
||||
this.#pollMetadata();
|
||||
} catch (err) {
|
||||
this.emit('error', new InnertubeError('Failed to poll live metadata, retrying...', err));
|
||||
await this.#wait(2000);
|
||||
if (this.running)
|
||||
this.#pollMetadata();
|
||||
@@ -209,11 +254,12 @@ class LiveChat extends EventEmitter {
|
||||
params: Proto.encodeMessageParams(this.#channel_id, this.#video_id),
|
||||
richMessage: { textSegments: [ { text } ] },
|
||||
clientMessageId: uuidv4(),
|
||||
client: 'ANDROID',
|
||||
parse: true
|
||||
});
|
||||
|
||||
if (!response.actions)
|
||||
throw new InnertubeError('Response did not have an "actions" property. The call may have failed.');
|
||||
throw new InnertubeError('Unexpected response from send_message', response);
|
||||
|
||||
return response.actions.array().as(AddChatItemAction);
|
||||
}
|
||||
|
||||
159
src/parser/youtube/SmoothedQueue.ts
Normal file
159
src/parser/youtube/SmoothedQueue.ts
Normal file
@@ -0,0 +1,159 @@
|
||||
import { YTNode } from '../helpers';
|
||||
|
||||
/**
|
||||
* Flattens the given queue.
|
||||
* @param queue - The queue to flatten.
|
||||
*/
|
||||
function flattenQueue(queue: YTNode[][]) {
|
||||
const nodes: YTNode[] = [];
|
||||
|
||||
for (const group of queue) {
|
||||
if (Array.isArray(group)) {
|
||||
for (const node of group) {
|
||||
nodes.push(node);
|
||||
}
|
||||
} else {
|
||||
nodes.push(group);
|
||||
}
|
||||
}
|
||||
|
||||
return nodes;
|
||||
}
|
||||
|
||||
class DelayQueue {
|
||||
front: number[];
|
||||
back: number[];
|
||||
|
||||
constructor() {
|
||||
this.front = [];
|
||||
this.back = [];
|
||||
}
|
||||
|
||||
public isEmpty(): boolean {
|
||||
return !this.front.length && !this.back.length;
|
||||
}
|
||||
|
||||
public clear(): void {
|
||||
this.front = [];
|
||||
this.back = [];
|
||||
}
|
||||
|
||||
public getValues(): number[] {
|
||||
return this.front.concat(this.back.reverse());
|
||||
}
|
||||
}
|
||||
|
||||
class SmoothedQueue {
|
||||
#last_update_time: number | null;
|
||||
#estimated_update_interval: number | null;
|
||||
#callback: Function | null;
|
||||
#action_queue: YTNode[][];
|
||||
#next_update_id: any;
|
||||
#poll_response_delay_queue: DelayQueue;
|
||||
|
||||
constructor() {
|
||||
this.#last_update_time = null;
|
||||
this.#estimated_update_interval = null;
|
||||
this.#callback = null;
|
||||
this.#action_queue = [];
|
||||
this.#next_update_id = null;
|
||||
this.#poll_response_delay_queue = new DelayQueue();
|
||||
}
|
||||
|
||||
public enqueueActionGroup(group: YTNode[]): void {
|
||||
if (this.#last_update_time !== null) {
|
||||
const delay = Date.now() - this.#last_update_time;
|
||||
|
||||
this.#poll_response_delay_queue.back.push(delay);
|
||||
|
||||
if (5 < (this.#poll_response_delay_queue.front.length + this.#poll_response_delay_queue.back.length)) {
|
||||
if (!this.#poll_response_delay_queue.front.length) {
|
||||
this.#poll_response_delay_queue.front = this.#poll_response_delay_queue.back;
|
||||
this.#poll_response_delay_queue.front.reverse();
|
||||
this.#poll_response_delay_queue.back = [];
|
||||
}
|
||||
|
||||
this.#poll_response_delay_queue.front.pop();
|
||||
}
|
||||
|
||||
this.#estimated_update_interval = Math.max(...this.#poll_response_delay_queue.getValues());
|
||||
}
|
||||
|
||||
this.#last_update_time = Date.now();
|
||||
|
||||
this.#action_queue.push(group);
|
||||
|
||||
if (this.#next_update_id === null) {
|
||||
this.#next_update_id = setTimeout(this.emitSmoothedActions.bind(this));
|
||||
}
|
||||
}
|
||||
|
||||
public emitSmoothedActions(): void {
|
||||
this.#next_update_id = null;
|
||||
|
||||
if (this.#action_queue.length) {
|
||||
let delay = 1E4;
|
||||
|
||||
if (this.#estimated_update_interval !== null && this.#last_update_time !== null) {
|
||||
delay = this.#estimated_update_interval - Date.now() + this.#last_update_time;
|
||||
}
|
||||
|
||||
delay = this.#action_queue.length < delay / 80 ? 1 : Math.ceil(this.#action_queue.length / (delay / 80));
|
||||
|
||||
const actions = flattenQueue(this.#action_queue.splice(0, delay));
|
||||
|
||||
if (this.#callback) {
|
||||
this.#callback(actions);
|
||||
}
|
||||
|
||||
if (this.#action_queue !== null) {
|
||||
delay == 1 ? (
|
||||
delay = this.#estimated_update_interval as number / this.#action_queue.length,
|
||||
delay *= Math.random() + 0.5,
|
||||
delay = Math.min(1E3, delay),
|
||||
delay = Math.max(80, delay)
|
||||
) : delay = 80;
|
||||
|
||||
this.#next_update_id = setTimeout(this.emitSmoothedActions.bind(this), delay);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public clear() {
|
||||
if (this.#next_update_id !== null) {
|
||||
clearTimeout(this.#next_update_id);
|
||||
this.#next_update_id = null;
|
||||
}
|
||||
this.#action_queue = [];
|
||||
}
|
||||
|
||||
set callback(cb: Function | null) {
|
||||
this.#callback = cb;
|
||||
}
|
||||
|
||||
get callback(): Function | null {
|
||||
return this.#callback;
|
||||
}
|
||||
|
||||
get action_queue(): YTNode[][] {
|
||||
return this.#action_queue;
|
||||
}
|
||||
|
||||
get estimated_update_interval(): number | null {
|
||||
return this.#estimated_update_interval;
|
||||
}
|
||||
|
||||
get last_update_time(): number | null {
|
||||
return this.#last_update_time;
|
||||
}
|
||||
|
||||
get next_update_id(): any {
|
||||
return this.#next_update_id;
|
||||
}
|
||||
|
||||
get poll_response_delay_queue(): DelayQueue {
|
||||
return this.#poll_response_delay_queue;
|
||||
}
|
||||
}
|
||||
|
||||
export default SmoothedQueue;
|
||||
Reference in New Issue
Block a user