refactor: split remaining monitor runtime helpers

This commit is contained in:
Peter Steinberger 2026-03-17 21:27:14 -07:00
parent 4e94f3aa02
commit b86bc9de95
No known key found for this signature in database
19 changed files with 1825 additions and 1676 deletions

View File

@ -2,7 +2,7 @@ import { ChannelType } from "discord-api-types/v10";
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { NativeCommandSpec } from "../../../../src/auto-reply/commands-registry.js";
import * as dispatcherModule from "../../../../src/auto-reply/reply/provider-dispatcher.js";
import type { ChatType } from "../../../../src/channels/chat-type.js";
import { setDefaultChannelPluginRegistryForTests } from "../../../../src/commands/channel-test-helpers.js";
import type { OpenClawConfig } from "../../../../src/config/config.js";
import * as pluginCommandsModule from "../../../../src/plugins/commands.js";
import { clearPluginCommands, registerPluginCommand } from "../../../../src/plugins/commands.js";
@ -12,32 +12,26 @@ import {
} from "./native-command.test-helpers.js";
import { createNoopThreadBindingManager } from "./thread-bindings.js";
type ResolveConfiguredBindingRouteFn =
typeof import("openclaw/plugin-sdk/conversation-runtime").resolveConfiguredBindingRoute;
type EnsureConfiguredBindingRouteReadyFn =
typeof import("openclaw/plugin-sdk/conversation-runtime").ensureConfiguredBindingRouteReady;
const persistentBindingMocks = vi.hoisted(() => ({
resolveConfiguredAcpBindingRecord: vi.fn<ResolveConfiguredBindingRouteFn>((params) => ({
bindingResolution: null,
route: params.route,
})),
ensureConfiguredAcpBindingSession: vi.fn<EnsureConfiguredBindingRouteReadyFn>(async () => ({
const ensureConfiguredBindingRouteReadyMock = vi.hoisted(() =>
vi.fn<EnsureConfiguredBindingRouteReadyFn>(async () => ({
ok: true,
})),
}));
);
vi.mock("openclaw/plugin-sdk/conversation-runtime", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/conversation-runtime")>();
return {
...actual,
resolveConfiguredBindingRoute: persistentBindingMocks.resolveConfiguredAcpBindingRecord,
ensureConfiguredBindingRouteReady: persistentBindingMocks.ensureConfiguredAcpBindingSession,
ensureConfiguredBindingRouteReady: (...args: unknown[]) =>
ensureConfiguredBindingRouteReadyMock(
...(args as Parameters<EnsureConfiguredBindingRouteReadyFn>),
),
};
});
import { createDiscordNativeCommand } from "./native-command.js";
function createInteraction(params?: {
channelType?: ChannelType;
channelId?: string;
@ -66,7 +60,12 @@ function createConfig(): OpenClawConfig {
} as OpenClawConfig;
}
function createNativeCommand(cfg: OpenClawConfig, commandSpec: NativeCommandSpec) {
async function loadCreateDiscordNativeCommand() {
return (await import("./native-command.js")).createDiscordNativeCommand;
}
async function createNativeCommand(cfg: OpenClawConfig, commandSpec: NativeCommandSpec) {
const createDiscordNativeCommand = await loadCreateDiscordNativeCommand();
return createDiscordNativeCommand({
command: commandSpec,
cfg,
@ -78,7 +77,8 @@ function createNativeCommand(cfg: OpenClawConfig, commandSpec: NativeCommandSpec
});
}
function createPluginCommand(params: { cfg: OpenClawConfig; name: string }) {
async function createPluginCommand(params: { cfg: OpenClawConfig; name: string }) {
const createDiscordNativeCommand = await loadCreateDiscordNativeCommand();
return createDiscordNativeCommand({
command: {
name: params.name,
@ -119,7 +119,7 @@ async function expectPairCommandReply(params: {
commandName: string;
interaction: MockCommandInteraction;
}) {
const command = createPluginCommand({
const command = await createPluginCommand({
cfg: params.cfg,
name: params.commandName,
});
@ -143,150 +143,14 @@ async function expectPairCommandReply(params: {
);
}
function createStatusCommand(cfg: OpenClawConfig) {
return createNativeCommand(cfg, {
async function createStatusCommand(cfg: OpenClawConfig) {
return await createNativeCommand(cfg, {
name: "status",
description: "Status",
acceptsArgs: false,
});
}
function resolveConversationFromParams(params: Parameters<ResolveConfiguredBindingRouteFn>[0]) {
if ("conversation" in params) {
return params.conversation;
}
return {
channel: params.channel,
accountId: params.accountId,
conversationId: params.conversationId,
...(params.parentConversationId ? { parentConversationId: params.parentConversationId } : {}),
};
}
function createConfiguredBindingResolution(params: {
conversation: ReturnType<typeof resolveConversationFromParams>;
boundSessionKey: string;
}) {
const peerKind: ChatType = params.conversation.conversationId.startsWith("dm-")
? "direct"
: "channel";
const configuredBinding = {
spec: {
channel: "discord" as const,
accountId: params.conversation.accountId,
conversationId: params.conversation.conversationId,
...(params.conversation.parentConversationId
? { parentConversationId: params.conversation.parentConversationId }
: {}),
agentId: "codex",
mode: "persistent" as const,
},
record: {
bindingId: `config:acp:discord:${params.conversation.accountId}:${params.conversation.conversationId}`,
targetSessionKey: params.boundSessionKey,
targetKind: "session" as const,
conversation: params.conversation,
status: "active" as const,
boundAt: 0,
},
};
return {
conversation: params.conversation,
compiledBinding: {
channel: "discord" as const,
binding: {
type: "acp" as const,
agentId: "codex",
match: {
channel: "discord",
accountId: params.conversation.accountId,
peer: {
kind: peerKind,
id: params.conversation.conversationId,
},
},
acp: {
mode: "persistent" as const,
},
},
bindingConversationId: params.conversation.conversationId,
target: {
conversationId: params.conversation.conversationId,
...(params.conversation.parentConversationId
? { parentConversationId: params.conversation.parentConversationId }
: {}),
},
agentId: "codex",
provider: {
compileConfiguredBinding: () => ({
conversationId: params.conversation.conversationId,
...(params.conversation.parentConversationId
? { parentConversationId: params.conversation.parentConversationId }
: {}),
}),
matchInboundConversation: () => ({
conversationId: params.conversation.conversationId,
...(params.conversation.parentConversationId
? { parentConversationId: params.conversation.parentConversationId }
: {}),
}),
},
targetFactory: {
driverId: "acp" as const,
materialize: () => ({
record: configuredBinding.record,
statefulTarget: {
kind: "stateful" as const,
driverId: "acp",
sessionKey: params.boundSessionKey,
agentId: "codex",
},
}),
},
},
match: {
conversationId: params.conversation.conversationId,
...(params.conversation.parentConversationId
? { parentConversationId: params.conversation.parentConversationId }
: {}),
},
record: configuredBinding.record,
statefulTarget: {
kind: "stateful" as const,
driverId: "acp",
sessionKey: params.boundSessionKey,
agentId: "codex",
},
};
}
function setConfiguredBinding(channelId: string, boundSessionKey: string) {
persistentBindingMocks.resolveConfiguredAcpBindingRecord.mockImplementation((params) => {
const conversation = resolveConversationFromParams(params);
const bindingResolution = createConfiguredBindingResolution({
conversation: {
...conversation,
conversationId: channelId,
},
boundSessionKey,
});
return {
bindingResolution,
boundSessionKey,
boundAgentId: "codex",
route: {
...params.route,
agentId: "codex",
sessionKey: boundSessionKey,
matchedBy: "binding.channel",
},
};
});
persistentBindingMocks.ensureConfiguredAcpBindingSession.mockResolvedValue({
ok: true,
});
}
function createDispatchSpy() {
return vi.spyOn(dispatcherModule, "dispatchReplyWithDispatcher").mockResolvedValue({
counts: {
@ -299,26 +163,23 @@ function createDispatchSpy() {
function expectBoundSessionDispatch(
dispatchSpy: ReturnType<typeof createDispatchSpy>,
boundSessionKey: string,
expectedPattern: RegExp,
) {
expect(dispatchSpy).toHaveBeenCalledTimes(1);
const dispatchCall = dispatchSpy.mock.calls[0]?.[0] as {
ctx?: { SessionKey?: string; CommandTargetSessionKey?: string };
};
expect(dispatchCall.ctx?.SessionKey).toBe(boundSessionKey);
expect(dispatchCall.ctx?.CommandTargetSessionKey).toBe(boundSessionKey);
expect(persistentBindingMocks.resolveConfiguredAcpBindingRecord).toHaveBeenCalledTimes(1);
expect(persistentBindingMocks.ensureConfiguredAcpBindingSession).toHaveBeenCalledTimes(1);
expect(dispatchCall.ctx?.SessionKey).toMatch(expectedPattern);
expect(dispatchCall.ctx?.CommandTargetSessionKey).toMatch(expectedPattern);
expect(ensureConfiguredBindingRouteReadyMock).toHaveBeenCalledTimes(1);
}
async function expectBoundStatusCommandDispatch(params: {
cfg: OpenClawConfig;
interaction: MockCommandInteraction;
channelId: string;
boundSessionKey: string;
expectedPattern: RegExp;
}) {
const command = createStatusCommand(params.cfg);
setConfiguredBinding(params.channelId, params.boundSessionKey);
const command = await createStatusCommand(params.cfg);
vi.spyOn(pluginCommandsModule, "matchPluginCommand").mockReturnValue(null);
const dispatchSpy = createDispatchSpy();
@ -327,20 +188,16 @@ async function expectBoundStatusCommandDispatch(params: {
params.interaction as unknown,
);
expectBoundSessionDispatch(dispatchSpy, params.boundSessionKey);
expectBoundSessionDispatch(dispatchSpy, params.expectedPattern);
}
describe("Discord native plugin command dispatch", () => {
beforeEach(() => {
vi.clearAllMocks();
clearPluginCommands();
persistentBindingMocks.resolveConfiguredAcpBindingRecord.mockReset();
persistentBindingMocks.resolveConfiguredAcpBindingRecord.mockImplementation((params) => ({
bindingResolution: null,
route: params.route,
}));
persistentBindingMocks.ensureConfiguredAcpBindingSession.mockReset();
persistentBindingMocks.ensureConfiguredAcpBindingSession.mockResolvedValue({
setDefaultChannelPluginRegistryForTests();
ensureConfiguredBindingRouteReadyMock.mockReset();
ensureConfiguredBindingRouteReadyMock.mockResolvedValue({
ok: true,
});
});
@ -397,15 +254,7 @@ describe("Discord native plugin command dispatch", () => {
description: "Pair",
acceptsArgs: true,
};
const command = createDiscordNativeCommand({
command: commandSpec,
cfg,
discordConfig: cfg.channels?.discord ?? {},
accountId: "default",
sessionPrefix: "discord:slash",
ephemeralDefault: true,
threadBindings: createNoopThreadBindingManager("default"),
});
const command = await createNativeCommand(cfg, commandSpec);
const interaction = createInteraction({
channelType: ChannelType.GuildText,
channelId: "234567890123456789",
@ -449,15 +298,7 @@ describe("Discord native plugin command dispatch", () => {
description: "List cron jobs",
acceptsArgs: false,
};
const command = createDiscordNativeCommand({
command: commandSpec,
cfg,
discordConfig: cfg.channels?.discord ?? {},
accountId: "default",
sessionPrefix: "discord:slash",
ephemeralDefault: true,
threadBindings: createNoopThreadBindingManager("default"),
});
const command = await createNativeCommand(cfg, commandSpec);
const interaction = createInteraction();
const pluginMatch = {
command: {
@ -492,11 +333,21 @@ describe("Discord native plugin command dispatch", () => {
it("routes native slash commands through configured ACP Discord channel bindings", async () => {
const guildId = "1459246755253325866";
const channelId = "1478836151241412759";
const boundSessionKey = "agent:codex:acp:binding:discord:default:feedface";
const cfg = {
commands: {
useAccessGroups: false,
},
channels: {
discord: {
guilds: {
[guildId]: {
channels: {
[channelId]: { allow: true, requireMention: false },
},
},
},
},
},
bindings: [
{
type: "acp",
@ -522,8 +373,7 @@ describe("Discord native plugin command dispatch", () => {
await expectBoundStatusCommandDispatch({
cfg,
interaction,
channelId,
boundSessionKey,
expectedPattern: /^agent:codex:acp:binding:discord:default:/,
});
});
@ -557,7 +407,7 @@ describe("Discord native plugin command dispatch", () => {
},
},
} as OpenClawConfig;
const command = createStatusCommand(cfg);
const command = await createStatusCommand(cfg);
const interaction = createInteraction({
channelType: ChannelType.GuildText,
channelId,
@ -578,13 +428,11 @@ describe("Discord native plugin command dispatch", () => {
expect(dispatchCall.ctx?.CommandTargetSessionKey).toBe(
"agent:qwen:discord:channel:1478836151241412759",
);
expect(persistentBindingMocks.resolveConfiguredAcpBindingRecord).toHaveBeenCalledTimes(1);
expect(persistentBindingMocks.ensureConfiguredAcpBindingSession).not.toHaveBeenCalled();
expect(ensureConfiguredBindingRouteReadyMock).not.toHaveBeenCalled();
});
it("routes Discord DM native slash commands through configured ACP bindings", async () => {
const channelId = "dm-1";
const boundSessionKey = "agent:codex:acp:binding:discord:default:dmfeedface";
const cfg = {
commands: {
useAccessGroups: false,
@ -617,15 +465,13 @@ describe("Discord native plugin command dispatch", () => {
await expectBoundStatusCommandDispatch({
cfg,
interaction,
channelId,
boundSessionKey,
expectedPattern: /^agent:codex:acp:binding:discord:default:/,
});
});
it("allows recovery commands through configured ACP bindings even when ensure fails", async () => {
const guildId = "1459246755253325866";
const channelId = "1479098716916023408";
const boundSessionKey = "agent:codex:acp:binding:discord:default:feedface";
const cfg = {
commands: {
useAccessGroups: false,
@ -651,14 +497,13 @@ describe("Discord native plugin command dispatch", () => {
guildId,
guildName: "Ops",
});
const command = createNativeCommand(cfg, {
const command = await createNativeCommand(cfg, {
name: "new",
description: "Start a new session.",
acceptsArgs: true,
});
setConfiguredBinding(channelId, boundSessionKey);
persistentBindingMocks.ensureConfiguredAcpBindingSession.mockResolvedValue({
ensureConfiguredBindingRouteReadyMock.mockResolvedValue({
ok: false,
error: "acpx exited with code 1",
});
@ -671,10 +516,11 @@ describe("Discord native plugin command dispatch", () => {
const dispatchCall = dispatchSpy.mock.calls[0]?.[0] as {
ctx?: { SessionKey?: string; CommandTargetSessionKey?: string };
};
expect(dispatchCall.ctx?.SessionKey).toBe(boundSessionKey);
expect(dispatchCall.ctx?.CommandTargetSessionKey).toBe(boundSessionKey);
expect(persistentBindingMocks.resolveConfiguredAcpBindingRecord).toHaveBeenCalledTimes(1);
expect(persistentBindingMocks.ensureConfiguredAcpBindingSession).not.toHaveBeenCalled();
expect(dispatchCall.ctx?.SessionKey).toMatch(/^agent:codex:acp:binding:discord:default:/);
expect(dispatchCall.ctx?.CommandTargetSessionKey).toMatch(
/^agent:codex:acp:binding:discord:default:/,
);
expect(ensureConfiguredBindingRouteReadyMock).not.toHaveBeenCalled();
expect(interaction.reply).not.toHaveBeenCalledWith(
expect.objectContaining({
content: "Configured ACP binding is unavailable right now. Please try again.",

View File

@ -7,7 +7,6 @@ import {
baseRuntime,
getFirstDiscordMessageHandlerParams,
getProviderMonitorTestMocks,
mockResolvedDiscordAccountConfig,
resetDiscordProviderMonitorMocks,
} from "../../../../test/helpers/extensions/discord-provider.test-support.js";
@ -37,6 +36,21 @@ const {
voiceRuntimeModuleLoadedMock,
} = getProviderMonitorTestMocks();
function createConfigWithDiscordAccount(overrides: Record<string, unknown> = {}): OpenClawConfig {
return {
channels: {
discord: {
accounts: {
default: {
token: "MTIz.abc.def",
...overrides,
},
},
},
},
} as OpenClawConfig;
}
vi.mock("openclaw/plugin-sdk/plugin-runtime", async () => {
const actual = await vi.importActual<typeof import("openclaw/plugin-sdk/plugin-runtime")>(
"openclaw/plugin-sdk/plugin-runtime",
@ -90,7 +104,18 @@ describe("monitorDiscordProvider", () => {
};
beforeEach(() => {
vi.resetModules();
resetDiscordProviderMonitorMocks();
vi.doMock("../accounts.js", () => ({
resolveDiscordAccount: (...args: Parameters<typeof resolveDiscordAccountMock>) =>
resolveDiscordAccountMock(...args),
}));
vi.doMock("../probe.js", () => ({
fetchDiscordApplicationId: async () => "app-1",
}));
vi.doMock("../token.js", () => ({
normalizeDiscordToken: (value?: string) => value,
}));
});
it("stops thread bindings when startup fails before lifecycle begins", async () => {
@ -139,7 +164,7 @@ describe("monitorDiscordProvider", () => {
it("loads the Discord voice runtime only when voice is enabled", async () => {
resolveDiscordAccountMock.mockReturnValue({
accountId: "default",
token: "cfg-token",
token: "MTIz.abc.def",
config: {
commands: { native: true, nativeSkills: false },
voice: { enabled: true },
@ -356,11 +381,18 @@ describe("monitorDiscordProvider", () => {
});
it("forwards custom eventQueue config from discord config to Carbon Client", async () => {
const { monitorDiscordProvider } = await import("./provider.js");
mockResolvedDiscordAccountConfig({
eventQueue: { listenerTimeout: 300_000 },
resolveDiscordAccountMock.mockReturnValue({
accountId: "default",
token: "MTIz.abc.def",
config: {
commands: { native: true, nativeSkills: false },
voice: { enabled: false },
agentComponents: { enabled: false },
execApprovals: { enabled: false },
eventQueue: { listenerTimeout: 300_000 },
},
});
const { monitorDiscordProvider } = await import("./provider.js");
await monitorDiscordProvider({
config: baseConfig(),
@ -374,12 +406,10 @@ describe("monitorDiscordProvider", () => {
it("does not reuse eventQueue.listenerTimeout as the queued inbound worker timeout", async () => {
const { monitorDiscordProvider } = await import("./provider.js");
mockResolvedDiscordAccountConfig({
eventQueue: { listenerTimeout: 50_000 },
});
await monitorDiscordProvider({
config: baseConfig(),
config: createConfigWithDiscordAccount({
eventQueue: { listenerTimeout: 50_000 },
}),
runtime: baseRuntime(),
});
@ -392,11 +422,18 @@ describe("monitorDiscordProvider", () => {
});
it("forwards inbound worker timeout config to the Discord message handler", async () => {
const { monitorDiscordProvider } = await import("./provider.js");
mockResolvedDiscordAccountConfig({
inboundWorker: { runTimeoutMs: 300_000 },
resolveDiscordAccountMock.mockReturnValue({
accountId: "default",
token: "MTIz.abc.def",
config: {
commands: { native: true, nativeSkills: false },
voice: { enabled: false },
agentComponents: { enabled: false },
execApprovals: { enabled: false },
inboundWorker: { runTimeoutMs: 300_000 },
},
});
const { monitorDiscordProvider } = await import("./provider.js");
await monitorDiscordProvider({
config: baseConfig(),

View File

@ -35,13 +35,16 @@ export type {
ChannelMessageActionAdapter,
ChannelMessageActionName,
} from "openclaw/plugin-sdk/channel-runtime";
export { withNormalizedTimestamp } from "../../../src/agents/date-time.js";
export { assertMediaNotDataUrl } from "../../../src/agents/sandbox-paths.js";
export { parseAvailableTags, readReactionParams } from "openclaw/plugin-sdk/discord-core";
export { resolvePollMaxSelections } from "../../../src/polls.js";
export type { DiscordAccountConfig, DiscordActionConfig } from "../../../src/config/types.js";
export {
assertMediaNotDataUrl,
parseAvailableTags,
readReactionParams,
resolvePollMaxSelections,
withNormalizedTimestamp,
} from "openclaw/plugin-sdk/discord-core";
export type { DiscordAccountConfig, DiscordActionConfig } from "openclaw/plugin-sdk/discord";
export {
hasConfiguredSecretInput,
normalizeResolvedSecretInputString,
normalizeSecretInputString,
} from "../../../src/config/types.secrets.js";
} from "openclaw/plugin-sdk/config-runtime";

View File

@ -0,0 +1,471 @@
import type { ClawdbotConfig } from "../runtime-api.js";
import { normalizeFeishuExternalKey } from "./external-keys.js";
import { downloadMessageResourceFeishu } from "./media.js";
import { parsePostContent } from "./post.js";
import { getFeishuRuntime } from "./runtime.js";
import type { FeishuMediaInfo } from "./types.js";
export type FeishuMention = {
key: string;
id: {
open_id?: string;
user_id?: string;
union_id?: string;
};
name: string;
tenant_key?: string;
};
type FeishuMessageLike = {
message: {
content: string;
message_type: string;
mentions?: FeishuMention[];
chat_id: string;
root_id?: string;
parent_id?: string;
thread_id?: string;
message_id: string;
};
sender: {
sender_id: {
open_id?: string;
user_id?: string;
};
};
};
export type GroupSessionScope = "group" | "group_sender" | "group_topic" | "group_topic_sender";
export type ResolvedFeishuGroupSession = {
peerId: string;
parentPeer: { kind: "group"; id: string } | null;
groupSessionScope: GroupSessionScope;
replyInThread: boolean;
threadReply: boolean;
};
function buildFeishuConversationId(params: {
chatId: string;
scope: GroupSessionScope | "group_sender";
topicId?: string;
senderOpenId?: string;
}): string {
switch (params.scope) {
case "group_sender":
return `${params.chatId}:sender:${params.senderOpenId}`;
case "group_topic":
return `${params.chatId}:topic:${params.topicId}`;
case "group_topic_sender":
return `${params.chatId}:topic:${params.topicId}:sender:${params.senderOpenId}`;
default:
return params.chatId;
}
}
export function resolveFeishuGroupSession(params: {
chatId: string;
senderOpenId: string;
messageId: string;
rootId?: string;
threadId?: string;
groupConfig?: {
groupSessionScope?: GroupSessionScope;
topicSessionMode?: "enabled" | "disabled";
replyInThread?: "enabled" | "disabled";
};
feishuCfg?: {
groupSessionScope?: GroupSessionScope;
topicSessionMode?: "enabled" | "disabled";
replyInThread?: "enabled" | "disabled";
};
}): ResolvedFeishuGroupSession {
const { chatId, senderOpenId, messageId, rootId, threadId, groupConfig, feishuCfg } = params;
const normalizedThreadId = threadId?.trim();
const normalizedRootId = rootId?.trim();
const threadReply = Boolean(normalizedThreadId || normalizedRootId);
const replyInThread =
(groupConfig?.replyInThread ?? feishuCfg?.replyInThread ?? "disabled") === "enabled" ||
threadReply;
const legacyTopicSessionMode =
groupConfig?.topicSessionMode ?? feishuCfg?.topicSessionMode ?? "disabled";
const groupSessionScope: GroupSessionScope =
groupConfig?.groupSessionScope ??
feishuCfg?.groupSessionScope ??
(legacyTopicSessionMode === "enabled" ? "group_topic" : "group");
const topicScope =
groupSessionScope === "group_topic" || groupSessionScope === "group_topic_sender"
? (normalizedRootId ?? normalizedThreadId ?? (replyInThread ? messageId : null))
: null;
let peerId = chatId;
switch (groupSessionScope) {
case "group_sender":
peerId = buildFeishuConversationId({ chatId, scope: "group_sender", senderOpenId });
break;
case "group_topic":
peerId = topicScope
? buildFeishuConversationId({ chatId, scope: "group_topic", topicId: topicScope })
: chatId;
break;
case "group_topic_sender":
peerId = topicScope
? buildFeishuConversationId({
chatId,
scope: "group_topic_sender",
topicId: topicScope,
senderOpenId,
})
: buildFeishuConversationId({ chatId, scope: "group_sender", senderOpenId });
break;
case "group":
default:
peerId = chatId;
break;
}
return {
peerId,
parentPeer:
topicScope &&
(groupSessionScope === "group_topic" || groupSessionScope === "group_topic_sender")
? { kind: "group", id: chatId }
: null,
groupSessionScope,
replyInThread,
threadReply,
};
}
export function parseMessageContent(content: string, messageType: string): string {
if (messageType === "post") {
return parsePostContent(content).textContent;
}
try {
const parsed = JSON.parse(content);
if (messageType === "text") {
return parsed.text || "";
}
if (messageType === "share_chat") {
if (parsed && typeof parsed === "object") {
const share = parsed as { body?: unknown; summary?: unknown; share_chat_id?: unknown };
if (typeof share.body === "string" && share.body.trim()) {
return share.body.trim();
}
if (typeof share.summary === "string" && share.summary.trim()) {
return share.summary.trim();
}
if (typeof share.share_chat_id === "string" && share.share_chat_id.trim()) {
return `[Forwarded message: ${share.share_chat_id.trim()}]`;
}
}
return "[Forwarded message]";
}
if (messageType === "merge_forward") {
return "[Merged and Forwarded Message - loading...]";
}
return content;
} catch {
return content;
}
}
function formatSubMessageContent(content: string, contentType: string): string {
try {
const parsed = JSON.parse(content);
switch (contentType) {
case "text":
return parsed.text || content;
case "post":
return parsePostContent(content).textContent;
case "image":
return "[Image]";
case "file":
return `[File: ${parsed.file_name || "unknown"}]`;
case "audio":
return "[Audio]";
case "video":
return "[Video]";
case "sticker":
return "[Sticker]";
case "merge_forward":
return "[Nested Merged Forward]";
default:
return `[${contentType}]`;
}
} catch {
return content;
}
}
export function parseMergeForwardContent(params: {
content: string;
log?: (...args: any[]) => void;
}): string {
const { content, log } = params;
const maxMessages = 50;
log?.("feishu: parsing merge_forward sub-messages from API response");
let items: Array<{
message_id?: string;
msg_type?: string;
body?: { content?: string };
sender?: { id?: string };
upper_message_id?: string;
create_time?: string;
}>;
try {
items = JSON.parse(content);
} catch {
log?.("feishu: merge_forward items parse failed");
return "[Merged and Forwarded Message - parse error]";
}
if (!Array.isArray(items) || items.length === 0) {
return "[Merged and Forwarded Message - no sub-messages]";
}
const subMessages = items.filter((item) => item.upper_message_id);
if (subMessages.length === 0) {
return "[Merged and Forwarded Message - no sub-messages found]";
}
log?.(`feishu: merge_forward contains ${subMessages.length} sub-messages`);
subMessages.sort(
(a, b) => parseInt(a.create_time || "0", 10) - parseInt(b.create_time || "0", 10),
);
const lines = ["[Merged and Forwarded Messages]"];
for (const item of subMessages.slice(0, maxMessages)) {
lines.push(`- ${formatSubMessageContent(item.body?.content || "", item.msg_type || "text")}`);
}
if (subMessages.length > maxMessages) {
lines.push(`... and ${subMessages.length - maxMessages} more messages`);
}
return lines.join("\n");
}
export function checkBotMentioned(event: FeishuMessageLike, botOpenId?: string): boolean {
if (!botOpenId) {
return false;
}
if ((event.message.content ?? "").includes("@_all")) {
return true;
}
const mentions = event.message.mentions ?? [];
if (mentions.length > 0) {
return mentions.some((mention) => mention.id.open_id === botOpenId);
}
if (event.message.message_type === "post") {
return parsePostContent(event.message.content).mentionedOpenIds.some((id) => id === botOpenId);
}
return false;
}
export function normalizeMentions(
text: string,
mentions?: FeishuMention[],
botStripId?: string,
): string {
if (!mentions || mentions.length === 0) {
return text;
}
const escaped = (value: string) => value.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
const escapeName = (value: string) => value.replace(/</g, "&lt;").replace(/>/g, "&gt;");
let result = text;
for (const mention of mentions) {
const mentionId = mention.id.open_id;
const replacement =
botStripId && mentionId === botStripId
? ""
: mentionId
? `<at user_id="${mentionId}">${escapeName(mention.name)}</at>`
: `@${mention.name}`;
result = result.replace(new RegExp(escaped(mention.key), "g"), () => replacement).trim();
}
return result;
}
export function normalizeFeishuCommandProbeBody(text: string): string {
if (!text) {
return "";
}
return text
.replace(/<at\b[^>]*>[^<]*<\/at>/giu, " ")
.replace(/(^|\s)@[^/\s]+(?=\s|$|\/)/gu, "$1")
.replace(/\s+/g, " ")
.trim();
}
export function parseMediaKeys(
content: string,
messageType: string,
): { imageKey?: string; fileKey?: string; fileName?: string } {
try {
const parsed = JSON.parse(content);
const imageKey = normalizeFeishuExternalKey(parsed.image_key);
const fileKey = normalizeFeishuExternalKey(parsed.file_key);
switch (messageType) {
case "image":
return { imageKey, fileName: parsed.file_name };
case "file":
case "audio":
case "sticker":
return { fileKey, fileName: parsed.file_name };
case "video":
case "media":
return { fileKey, imageKey, fileName: parsed.file_name };
default:
return {};
}
} catch {
return {};
}
}
export function toMessageResourceType(messageType: string): "image" | "file" {
return messageType === "image" ? "image" : "file";
}
function inferPlaceholder(messageType: string): string {
switch (messageType) {
case "image":
return "<media:image>";
case "file":
return "<media:document>";
case "audio":
return "<media:audio>";
case "video":
case "media":
return "<media:video>";
case "sticker":
return "<media:sticker>";
default:
return "<media:document>";
}
}
export async function resolveFeishuMediaList(params: {
cfg: ClawdbotConfig;
messageId: string;
messageType: string;
content: string;
maxBytes: number;
log?: (msg: string) => void;
accountId?: string;
}): Promise<FeishuMediaInfo[]> {
const { cfg, messageId, messageType, content, maxBytes, log, accountId } = params;
const mediaTypes = ["image", "file", "audio", "video", "media", "sticker", "post"];
if (!mediaTypes.includes(messageType)) {
return [];
}
const out: FeishuMediaInfo[] = [];
const core = getFeishuRuntime();
if (messageType === "post") {
const { imageKeys, mediaKeys } = parsePostContent(content);
if (imageKeys.length === 0 && mediaKeys.length === 0) {
return [];
}
if (imageKeys.length > 0) {
log?.(`feishu: post message contains ${imageKeys.length} embedded image(s)`);
}
if (mediaKeys.length > 0) {
log?.(`feishu: post message contains ${mediaKeys.length} embedded media file(s)`);
}
for (const imageKey of imageKeys) {
try {
const result = await downloadMessageResourceFeishu({
cfg,
messageId,
fileKey: imageKey,
type: "image",
accountId,
});
const contentType =
result.contentType ?? (await core.media.detectMime({ buffer: result.buffer }));
const saved = await core.channel.media.saveMediaBuffer(
result.buffer,
contentType,
"inbound",
maxBytes,
);
out.push({
path: saved.path,
contentType: saved.contentType,
placeholder: "<media:image>",
});
log?.(`feishu: downloaded embedded image ${imageKey}, saved to ${saved.path}`);
} catch (err) {
log?.(`feishu: failed to download embedded image ${imageKey}: ${String(err)}`);
}
}
for (const media of mediaKeys) {
try {
const result = await downloadMessageResourceFeishu({
cfg,
messageId,
fileKey: media.fileKey,
type: "file",
accountId,
});
const contentType =
result.contentType ?? (await core.media.detectMime({ buffer: result.buffer }));
const saved = await core.channel.media.saveMediaBuffer(
result.buffer,
contentType,
"inbound",
maxBytes,
);
out.push({
path: saved.path,
contentType: saved.contentType,
placeholder: "<media:video>",
});
log?.(`feishu: downloaded embedded media ${media.fileKey}, saved to ${saved.path}`);
} catch (err) {
log?.(`feishu: failed to download embedded media ${media.fileKey}: ${String(err)}`);
}
}
return out;
}
const mediaKeys = parseMediaKeys(content, messageType);
if (!mediaKeys.imageKey && !mediaKeys.fileKey) {
return [];
}
try {
const fileKey = mediaKeys.fileKey || mediaKeys.imageKey;
if (!fileKey) {
return [];
}
const result = await downloadMessageResourceFeishu({
cfg,
messageId,
fileKey,
type: toMessageResourceType(messageType),
accountId,
});
const contentType =
result.contentType ?? (await core.media.detectMime({ buffer: result.buffer }));
const saved = await core.channel.media.saveMediaBuffer(
result.buffer,
contentType,
"inbound",
maxBytes,
result.fileName || mediaKeys.fileName,
);
out.push({
path: saved.path,
contentType: saved.contentType,
placeholder: inferPlaceholder(messageType),
});
log?.(`feishu: downloaded ${messageType} media, saved to ${saved.path}`);
} catch (err) {
log?.(`feishu: failed to download ${messageType} media: ${String(err)}`);
}
return out;
}

View File

@ -22,13 +22,20 @@ import {
warnMissingProviderGroupPolicyFallbackOnce,
} from "../runtime-api.js";
import { resolveFeishuAccount } from "./accounts.js";
import {
checkBotMentioned,
normalizeFeishuCommandProbeBody,
normalizeMentions,
parseMergeForwardContent,
parseMessageContent,
resolveFeishuGroupSession,
resolveFeishuMediaList,
toMessageResourceType,
} from "./bot-content.js";
import { type FeishuPermissionError, resolveFeishuSenderName } from "./bot-sender-name.js";
import { createFeishuClient } from "./client.js";
import { buildFeishuConversationId } from "./conversation-id.js";
import { finalizeFeishuMessageProcessing, tryRecordMessagePersistent } from "./dedup.js";
import { maybeCreateDynamicAgent } from "./dynamic-agent.js";
import { normalizeFeishuExternalKey } from "./external-keys.js";
import { downloadMessageResourceFeishu } from "./media.js";
import { extractMentionTargets, isMentionForwardRequest } from "./mention.js";
import {
resolveFeishuGroupConfig,
@ -36,13 +43,14 @@ import {
resolveFeishuAllowlistMatch,
isFeishuGroupAllowed,
} from "./policy.js";
import { parsePostContent } from "./post.js";
import { createFeishuReplyDispatcher } from "./reply-dispatcher.js";
import { getFeishuRuntime } from "./runtime.js";
import { getMessageFeishu, listFeishuThreadMessages, sendMessageFeishu } from "./send.js";
import type { FeishuMessageContext, FeishuMediaInfo } from "./types.js";
import type { FeishuMessageContext } from "./types.js";
import type { DynamicAgentCreationConfig } from "./types.js";
export { toMessageResourceType } from "./bot-content.js";
// Cache permission errors to avoid spamming the user with repeated notifications.
// Key: appId or "default", Value: timestamp of last notification
const permissionErrorNotifiedAt = new Map<string, number>();
@ -91,546 +99,6 @@ export type FeishuBotAddedEvent = {
operator_tenant_key?: string;
};
type GroupSessionScope = "group" | "group_sender" | "group_topic" | "group_topic_sender";
type ResolvedFeishuGroupSession = {
peerId: string;
parentPeer: { kind: "group"; id: string } | null;
groupSessionScope: GroupSessionScope;
replyInThread: boolean;
threadReply: boolean;
};
function resolveFeishuGroupSession(params: {
chatId: string;
senderOpenId: string;
messageId: string;
rootId?: string;
threadId?: string;
groupConfig?: {
groupSessionScope?: GroupSessionScope;
topicSessionMode?: "enabled" | "disabled";
replyInThread?: "enabled" | "disabled";
};
feishuCfg?: {
groupSessionScope?: GroupSessionScope;
topicSessionMode?: "enabled" | "disabled";
replyInThread?: "enabled" | "disabled";
};
}): ResolvedFeishuGroupSession {
const { chatId, senderOpenId, messageId, rootId, threadId, groupConfig, feishuCfg } = params;
const normalizedThreadId = threadId?.trim();
const normalizedRootId = rootId?.trim();
const threadReply = Boolean(normalizedThreadId || normalizedRootId);
const replyInThread =
(groupConfig?.replyInThread ?? feishuCfg?.replyInThread ?? "disabled") === "enabled" ||
threadReply;
const legacyTopicSessionMode =
groupConfig?.topicSessionMode ?? feishuCfg?.topicSessionMode ?? "disabled";
const groupSessionScope: GroupSessionScope =
groupConfig?.groupSessionScope ??
feishuCfg?.groupSessionScope ??
(legacyTopicSessionMode === "enabled" ? "group_topic" : "group");
// Keep topic session keys stable across the "first turn creates thread" flow:
// first turn may only have message_id, while the next turn carries root_id/thread_id.
// Prefer root_id first so both turns stay on the same peer key.
const topicScope =
groupSessionScope === "group_topic" || groupSessionScope === "group_topic_sender"
? (normalizedRootId ?? normalizedThreadId ?? (replyInThread ? messageId : null))
: null;
let peerId = chatId;
switch (groupSessionScope) {
case "group_sender":
peerId = buildFeishuConversationId({
chatId,
scope: "group_sender",
senderOpenId,
});
break;
case "group_topic":
peerId = topicScope
? buildFeishuConversationId({
chatId,
scope: "group_topic",
topicId: topicScope,
})
: chatId;
break;
case "group_topic_sender":
peerId = topicScope
? buildFeishuConversationId({
chatId,
scope: "group_topic_sender",
topicId: topicScope,
senderOpenId,
})
: buildFeishuConversationId({
chatId,
scope: "group_sender",
senderOpenId,
});
break;
case "group":
default:
peerId = chatId;
break;
}
const parentPeer =
topicScope &&
(groupSessionScope === "group_topic" || groupSessionScope === "group_topic_sender")
? {
kind: "group" as const,
id: chatId,
}
: null;
return {
peerId,
parentPeer,
groupSessionScope,
replyInThread,
threadReply,
};
}
function parseMessageContent(content: string, messageType: string): string {
if (messageType === "post") {
// Extract text content from rich text post
const { textContent } = parsePostContent(content);
return textContent;
}
try {
const parsed = JSON.parse(content);
if (messageType === "text") {
return parsed.text || "";
}
if (messageType === "share_chat") {
// Preserve available summary text for merged/forwarded chat messages.
if (parsed && typeof parsed === "object") {
const share = parsed as {
body?: unknown;
summary?: unknown;
share_chat_id?: unknown;
};
if (typeof share.body === "string" && share.body.trim().length > 0) {
return share.body.trim();
}
if (typeof share.summary === "string" && share.summary.trim().length > 0) {
return share.summary.trim();
}
if (typeof share.share_chat_id === "string" && share.share_chat_id.trim().length > 0) {
return `[Forwarded message: ${share.share_chat_id.trim()}]`;
}
}
return "[Forwarded message]";
}
if (messageType === "merge_forward") {
// Return placeholder; actual content fetched asynchronously in handleFeishuMessage
return "[Merged and Forwarded Message - loading...]";
}
return content;
} catch {
return content;
}
}
/**
* Parse merge_forward message content and fetch sub-messages.
* Returns formatted text content of all sub-messages.
*/
function parseMergeForwardContent(params: {
content: string;
log?: (...args: any[]) => void;
}): string {
const { content, log } = params;
const maxMessages = 50;
// For merge_forward, the API returns all sub-messages in items array
// with upper_message_id pointing to the merge_forward message.
// The 'content' parameter here is actually the full API response items array as JSON.
log?.(`feishu: parsing merge_forward sub-messages from API response`);
let items: Array<{
message_id?: string;
msg_type?: string;
body?: { content?: string };
sender?: { id?: string };
upper_message_id?: string;
create_time?: string;
}>;
try {
items = JSON.parse(content);
} catch {
log?.(`feishu: merge_forward items parse failed`);
return "[Merged and Forwarded Message - parse error]";
}
if (!Array.isArray(items) || items.length === 0) {
return "[Merged and Forwarded Message - no sub-messages]";
}
// Filter to only sub-messages (those with upper_message_id, skip the merge_forward container itself)
const subMessages = items.filter((item) => item.upper_message_id);
if (subMessages.length === 0) {
return "[Merged and Forwarded Message - no sub-messages found]";
}
log?.(`feishu: merge_forward contains ${subMessages.length} sub-messages`);
// Sort by create_time
subMessages.sort((a, b) => {
const timeA = parseInt(a.create_time || "0", 10);
const timeB = parseInt(b.create_time || "0", 10);
return timeA - timeB;
});
// Format output
const lines: string[] = ["[Merged and Forwarded Messages]"];
const limitedMessages = subMessages.slice(0, maxMessages);
for (const item of limitedMessages) {
const msgContent = item.body?.content || "";
const msgType = item.msg_type || "text";
const formatted = formatSubMessageContent(msgContent, msgType);
lines.push(`- ${formatted}`);
}
if (subMessages.length > maxMessages) {
lines.push(`... and ${subMessages.length - maxMessages} more messages`);
}
return lines.join("\n");
}
/**
* Format sub-message content based on message type.
*/
function formatSubMessageContent(content: string, contentType: string): string {
try {
const parsed = JSON.parse(content);
switch (contentType) {
case "text":
return parsed.text || content;
case "post": {
const { textContent } = parsePostContent(content);
return textContent;
}
case "image":
return "[Image]";
case "file":
return `[File: ${parsed.file_name || "unknown"}]`;
case "audio":
return "[Audio]";
case "video":
return "[Video]";
case "sticker":
return "[Sticker]";
case "merge_forward":
return "[Nested Merged Forward]";
default:
return `[${contentType}]`;
}
} catch {
return content;
}
}
function checkBotMentioned(event: FeishuMessageEvent, botOpenId?: string): boolean {
if (!botOpenId) return false;
// Check for @all (@_all in Feishu) — treat as mentioning every bot
const rawContent = event.message.content ?? "";
if (rawContent.includes("@_all")) return true;
const mentions = event.message.mentions ?? [];
if (mentions.length > 0) {
// Rely on Feishu mention IDs; display names can vary by alias/context.
return mentions.some((m) => m.id.open_id === botOpenId);
}
// Post (rich text) messages may have empty message.mentions when they contain docs/paste
if (event.message.message_type === "post") {
const { mentionedOpenIds } = parsePostContent(event.message.content);
return mentionedOpenIds.some((id) => id === botOpenId);
}
return false;
}
function normalizeMentions(
text: string,
mentions?: FeishuMessageEvent["message"]["mentions"],
botStripId?: string,
): string {
if (!mentions || mentions.length === 0) return text;
const escaped = (value: string) => value.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
const escapeName = (value: string) => value.replace(/</g, "&lt;").replace(/>/g, "&gt;");
let result = text;
for (const mention of mentions) {
const mentionId = mention.id.open_id;
const replacement =
botStripId && mentionId === botStripId
? ""
: mentionId
? `<at user_id="${mentionId}">${escapeName(mention.name)}</at>`
: `@${mention.name}`;
result = result.replace(new RegExp(escaped(mention.key), "g"), () => replacement).trim();
}
return result;
}
function normalizeFeishuCommandProbeBody(text: string): string {
if (!text) {
return "";
}
return text
.replace(/<at\b[^>]*>[^<]*<\/at>/giu, " ")
.replace(/(^|\s)@[^/\s]+(?=\s|$|\/)/gu, "$1")
.replace(/\s+/g, " ")
.trim();
}
/**
* Parse media keys from message content based on message type.
*/
function parseMediaKeys(
content: string,
messageType: string,
): {
imageKey?: string;
fileKey?: string;
fileName?: string;
} {
try {
const parsed = JSON.parse(content);
const imageKey = normalizeFeishuExternalKey(parsed.image_key);
const fileKey = normalizeFeishuExternalKey(parsed.file_key);
switch (messageType) {
case "image":
return { imageKey, fileName: parsed.file_name };
case "file":
return { fileKey, fileName: parsed.file_name };
case "audio":
return { fileKey, fileName: parsed.file_name };
case "video":
case "media":
// Video/media has both file_key (video) and image_key (thumbnail)
return { fileKey, imageKey, fileName: parsed.file_name };
case "sticker":
return { fileKey, fileName: parsed.file_name };
default:
return {};
}
} catch {
return {};
}
}
/**
* Map Feishu message type to messageResource.get resource type.
* Feishu messageResource API supports only: image | file.
*/
export function toMessageResourceType(messageType: string): "image" | "file" {
return messageType === "image" ? "image" : "file";
}
/**
* Infer placeholder text based on message type.
*/
function inferPlaceholder(messageType: string): string {
switch (messageType) {
case "image":
return "<media:image>";
case "file":
return "<media:document>";
case "audio":
return "<media:audio>";
case "video":
case "media":
return "<media:video>";
case "sticker":
return "<media:sticker>";
default:
return "<media:document>";
}
}
/**
* Resolve media from a Feishu message, downloading and saving to disk.
* Similar to Discord's resolveMediaList().
*/
async function resolveFeishuMediaList(params: {
cfg: ClawdbotConfig;
messageId: string;
messageType: string;
content: string;
maxBytes: number;
log?: (msg: string) => void;
accountId?: string;
}): Promise<FeishuMediaInfo[]> {
const { cfg, messageId, messageType, content, maxBytes, log, accountId } = params;
// Only process media message types (including post for embedded images)
const mediaTypes = ["image", "file", "audio", "video", "media", "sticker", "post"];
if (!mediaTypes.includes(messageType)) {
return [];
}
const out: FeishuMediaInfo[] = [];
const core = getFeishuRuntime();
// Handle post (rich text) messages with embedded images/media.
if (messageType === "post") {
const { imageKeys, mediaKeys: postMediaKeys } = parsePostContent(content);
if (imageKeys.length === 0 && postMediaKeys.length === 0) {
return [];
}
if (imageKeys.length > 0) {
log?.(`feishu: post message contains ${imageKeys.length} embedded image(s)`);
}
if (postMediaKeys.length > 0) {
log?.(`feishu: post message contains ${postMediaKeys.length} embedded media file(s)`);
}
for (const imageKey of imageKeys) {
try {
// Embedded images in post use messageResource API with image_key as file_key
const result = await downloadMessageResourceFeishu({
cfg,
messageId,
fileKey: imageKey,
type: "image",
accountId,
});
let contentType = result.contentType;
if (!contentType) {
contentType = await core.media.detectMime({ buffer: result.buffer });
}
const saved = await core.channel.media.saveMediaBuffer(
result.buffer,
contentType,
"inbound",
maxBytes,
);
out.push({
path: saved.path,
contentType: saved.contentType,
placeholder: "<media:image>",
});
log?.(`feishu: downloaded embedded image ${imageKey}, saved to ${saved.path}`);
} catch (err) {
log?.(`feishu: failed to download embedded image ${imageKey}: ${String(err)}`);
}
}
for (const media of postMediaKeys) {
try {
const result = await downloadMessageResourceFeishu({
cfg,
messageId,
fileKey: media.fileKey,
type: "file",
accountId,
});
let contentType = result.contentType;
if (!contentType) {
contentType = await core.media.detectMime({ buffer: result.buffer });
}
const saved = await core.channel.media.saveMediaBuffer(
result.buffer,
contentType,
"inbound",
maxBytes,
);
out.push({
path: saved.path,
contentType: saved.contentType,
placeholder: "<media:video>",
});
log?.(`feishu: downloaded embedded media ${media.fileKey}, saved to ${saved.path}`);
} catch (err) {
log?.(`feishu: failed to download embedded media ${media.fileKey}: ${String(err)}`);
}
}
return out;
}
// Handle other media types
const mediaKeys = parseMediaKeys(content, messageType);
if (!mediaKeys.imageKey && !mediaKeys.fileKey) {
return [];
}
try {
let buffer: Buffer;
let contentType: string | undefined;
let fileName: string | undefined;
// For message media, always use messageResource API
// The image.get API is only for images uploaded via im/v1/images, not for message attachments
const fileKey = mediaKeys.fileKey || mediaKeys.imageKey;
if (!fileKey) {
return [];
}
const resourceType = toMessageResourceType(messageType);
const result = await downloadMessageResourceFeishu({
cfg,
messageId,
fileKey,
type: resourceType,
accountId,
});
buffer = result.buffer;
contentType = result.contentType;
fileName = result.fileName || mediaKeys.fileName;
// Detect mime type if not provided
if (!contentType) {
contentType = await core.media.detectMime({ buffer });
}
// Save to disk using core's saveMediaBuffer
const saved = await core.channel.media.saveMediaBuffer(
buffer,
contentType,
"inbound",
maxBytes,
fileName,
);
out.push({
path: saved.path,
contentType: saved.contentType,
placeholder: inferPlaceholder(messageType),
});
log?.(`feishu: downloaded ${messageType} media, saved to ${saved.path}`);
} catch (err) {
log?.(`feishu: failed to download ${messageType} media: ${String(err)}`);
}
return out;
}
// --- Broadcast support ---
// Resolve broadcast agent list for a given peer (group) ID.
// Returns null if no broadcast config exists or the peer is not in the broadcast list.

View File

@ -0,0 +1,183 @@
import {
fetchMattermostChannel,
fetchMattermostUser,
sendMattermostTyping,
updateMattermostPost,
type MattermostChannel,
type MattermostClient,
type MattermostUser,
} from "./client.js";
import { buildButtonProps, type MattermostInteractionResponse } from "./interactions.js";
export type MattermostMediaKind = "image" | "audio" | "video" | "document" | "unknown";
export type MattermostMediaInfo = {
path: string;
contentType?: string;
kind: MattermostMediaKind;
};
const CHANNEL_CACHE_TTL_MS = 5 * 60_000;
const USER_CACHE_TTL_MS = 10 * 60_000;
type FetchRemoteMedia = (params: {
url: string;
requestInit?: RequestInit;
filePathHint?: string;
maxBytes: number;
ssrfPolicy?: { allowedHostnames?: string[] };
}) => Promise<{ buffer: Uint8Array; contentType?: string | null }>;
type SaveMediaBuffer = (
buffer: Uint8Array,
contentType: string | undefined,
direction: "inbound" | "outbound",
maxBytes: number,
) => Promise<{ path: string; contentType?: string | null }>;
export function createMattermostMonitorResources(params: {
accountId: string;
callbackUrl: string;
client: MattermostClient;
logger: { debug?: (...args: unknown[]) => void };
mediaMaxBytes: number;
fetchRemoteMedia: FetchRemoteMedia;
saveMediaBuffer: SaveMediaBuffer;
mediaKindFromMime: (contentType?: string) => MattermostMediaKind | null | undefined;
}) {
const {
accountId,
callbackUrl,
client,
logger,
mediaMaxBytes,
fetchRemoteMedia,
saveMediaBuffer,
mediaKindFromMime,
} = params;
const channelCache = new Map<string, { value: MattermostChannel | null; expiresAt: number }>();
const userCache = new Map<string, { value: MattermostUser | null; expiresAt: number }>();
const resolveMattermostMedia = async (
fileIds?: string[] | null,
): Promise<MattermostMediaInfo[]> => {
const ids = (fileIds ?? []).map((id) => id?.trim()).filter(Boolean);
if (ids.length === 0) {
return [];
}
const out: MattermostMediaInfo[] = [];
for (const fileId of ids) {
try {
const fetched = await fetchRemoteMedia({
url: `${client.apiBaseUrl}/files/${fileId}`,
requestInit: {
headers: {
Authorization: `Bearer ${client.token}`,
},
},
filePathHint: fileId,
maxBytes: mediaMaxBytes,
ssrfPolicy: { allowedHostnames: [new URL(client.baseUrl).hostname] },
});
const saved = await saveMediaBuffer(
Buffer.from(fetched.buffer),
fetched.contentType ?? undefined,
"inbound",
mediaMaxBytes,
);
const contentType = saved.contentType ?? fetched.contentType ?? undefined;
out.push({
path: saved.path,
contentType,
kind: mediaKindFromMime(contentType) ?? "unknown",
});
} catch (err) {
logger.debug?.(`mattermost: failed to download file ${fileId}: ${String(err)}`);
}
}
return out;
};
const sendTypingIndicator = async (channelId: string, parentId?: string) => {
await sendMattermostTyping(client, { channelId, parentId });
};
const resolveChannelInfo = async (channelId: string): Promise<MattermostChannel | null> => {
const cached = channelCache.get(channelId);
if (cached && cached.expiresAt > Date.now()) {
return cached.value;
}
try {
const info = await fetchMattermostChannel(client, channelId);
channelCache.set(channelId, {
value: info,
expiresAt: Date.now() + CHANNEL_CACHE_TTL_MS,
});
return info;
} catch (err) {
logger.debug?.(`mattermost: channel lookup failed: ${String(err)}`);
channelCache.set(channelId, {
value: null,
expiresAt: Date.now() + CHANNEL_CACHE_TTL_MS,
});
return null;
}
};
const resolveUserInfo = async (userId: string): Promise<MattermostUser | null> => {
const cached = userCache.get(userId);
if (cached && cached.expiresAt > Date.now()) {
return cached.value;
}
try {
const info = await fetchMattermostUser(client, userId);
userCache.set(userId, {
value: info,
expiresAt: Date.now() + USER_CACHE_TTL_MS,
});
return info;
} catch (err) {
logger.debug?.(`mattermost: user lookup failed: ${String(err)}`);
userCache.set(userId, {
value: null,
expiresAt: Date.now() + USER_CACHE_TTL_MS,
});
return null;
}
};
const buildModelPickerProps = (
channelId: string,
buttons: Array<unknown>,
): Record<string, unknown> | undefined =>
buildButtonProps({
callbackUrl,
accountId,
channelId,
buttons,
});
const updateModelPickerPost = async (params: {
channelId: string;
postId: string;
message: string;
buttons?: Array<unknown>;
}): Promise<MattermostInteractionResponse> => {
const props = buildModelPickerProps(params.channelId, params.buttons ?? []) ?? {
attachments: [],
};
await updateMattermostPost(client, params.postId, {
message: params.message,
props,
});
return {};
};
return {
resolveMattermostMedia,
sendTypingIndicator,
resolveChannelInfo,
resolveUserInfo,
updateModelPickerPost,
};
}

View File

@ -44,7 +44,6 @@ import {
type MattermostUser,
} from "./client.js";
import {
buildButtonProps,
computeInteractionCallbackUrl,
createMattermostInteractionHandler,
resolveInteractionCallbackPath,
@ -75,6 +74,7 @@ import {
resolveThreadSessionKeys,
} from "./monitor-helpers.js";
import { resolveOncharPrefixes, stripOncharPrefix } from "./monitor-onchar.js";
import { createMattermostMonitorResources, type MattermostMediaInfo } from "./monitor-resources.js";
import { registerMattermostMonitorSlashCommands } from "./monitor-slash.js";
import {
createMattermostConnectOnce,
@ -117,8 +117,6 @@ type MattermostReaction = {
};
const RECENT_MATTERMOST_MESSAGE_TTL_MS = 5 * 60_000;
const RECENT_MATTERMOST_MESSAGE_MAX = 2000;
const CHANNEL_CACHE_TTL_MS = 5 * 60_000;
const USER_CACHE_TTL_MS = 10 * 60_000;
function isLoopbackHost(hostname: string): boolean {
return hostname === "localhost" || hostname === "127.0.0.1" || hostname === "::1";
@ -215,12 +213,6 @@ export function resolveMattermostThreadSessionContext(params: {
parentSessionKey: threadKeys.parentSessionKey,
};
}
type MattermostMediaInfo = {
path: string;
contentType?: string;
kind: MediaKind;
};
function buildMattermostAttachmentPlaceholder(mediaList: MattermostMediaInfo[]): string {
if (mediaList.length === 0) {
return "";
@ -286,6 +278,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
baseUrl,
botUserId,
});
const slashEnabled = getSlashCommandState(account.accountId) != null;
// ─── Interactive buttons registration ──────────────────────────────────────
// Derive a stable HMAC secret from the bot token so CLI and gateway share it.
@ -536,8 +529,6 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
log: (msg: string) => runtime.log?.(msg),
});
const channelCache = new Map<string, { value: MattermostChannel | null; expiresAt: number }>();
const userCache = new Map<string, { value: MattermostUser | null; expiresAt: number }>();
const logger = core.logging.getChildLogger({ module: "mattermost" });
const logVerboseMessage = (message: string) => {
if (!core.logging.shouldLogVerbose()) {
@ -570,123 +561,25 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
log: (message) => logVerboseMessage(message),
});
const resolveMattermostMedia = async (
fileIds?: string[] | null,
): Promise<MattermostMediaInfo[]> => {
const ids = (fileIds ?? []).map((id) => id?.trim()).filter(Boolean);
if (ids.length === 0) {
return [];
}
const out: MattermostMediaInfo[] = [];
for (const fileId of ids) {
try {
const fetched = await core.channel.media.fetchRemoteMedia({
url: `${client.apiBaseUrl}/files/${fileId}`,
requestInit: {
headers: {
Authorization: `Bearer ${client.token}`,
},
},
filePathHint: fileId,
maxBytes: mediaMaxBytes,
// Allow fetching from the Mattermost server host (may be localhost or
// a private IP). Without this, SSRF guards block media downloads.
// Credit: #22594 (@webclerk)
ssrfPolicy: { allowedHostnames: [new URL(client.baseUrl).hostname] },
});
const saved = await core.channel.media.saveMediaBuffer(
fetched.buffer,
fetched.contentType ?? undefined,
"inbound",
mediaMaxBytes,
);
const contentType = saved.contentType ?? fetched.contentType ?? undefined;
out.push({
path: saved.path,
contentType,
kind: core.media.mediaKindFromMime(contentType) ?? "unknown",
});
} catch (err) {
logger.debug?.(`mattermost: failed to download file ${fileId}: ${String(err)}`);
}
}
return out;
};
const sendTypingIndicator = async (channelId: string, parentId?: string) => {
await sendMattermostTyping(client, { channelId, parentId });
};
const resolveChannelInfo = async (channelId: string): Promise<MattermostChannel | null> => {
const cached = channelCache.get(channelId);
if (cached && cached.expiresAt > Date.now()) {
return cached.value;
}
try {
const info = await fetchMattermostChannel(client, channelId);
channelCache.set(channelId, {
value: info,
expiresAt: Date.now() + CHANNEL_CACHE_TTL_MS,
});
return info;
} catch (err) {
logger.debug?.(`mattermost: channel lookup failed: ${String(err)}`);
channelCache.set(channelId, {
value: null,
expiresAt: Date.now() + CHANNEL_CACHE_TTL_MS,
});
return null;
}
};
const resolveUserInfo = async (userId: string): Promise<MattermostUser | null> => {
const cached = userCache.get(userId);
if (cached && cached.expiresAt > Date.now()) {
return cached.value;
}
try {
const info = await fetchMattermostUser(client, userId);
userCache.set(userId, {
value: info,
expiresAt: Date.now() + USER_CACHE_TTL_MS,
});
return info;
} catch (err) {
logger.debug?.(`mattermost: user lookup failed: ${String(err)}`);
userCache.set(userId, {
value: null,
expiresAt: Date.now() + USER_CACHE_TTL_MS,
});
return null;
}
};
const buildModelPickerProps = (
channelId: string,
buttons: Array<unknown>,
): Record<string, unknown> | undefined =>
buildButtonProps({
callbackUrl,
accountId: account.accountId,
channelId,
buttons,
});
const updateModelPickerPost = async (params: {
channelId: string;
postId: string;
message: string;
buttons?: Array<unknown>;
}): Promise<MattermostInteractionResponse> => {
const props = buildModelPickerProps(params.channelId, params.buttons ?? []) ?? {
attachments: [],
};
await updateMattermostPost(client, params.postId, {
message: params.message,
props,
});
return {};
};
const {
resolveMattermostMedia,
sendTypingIndicator,
resolveChannelInfo,
resolveUserInfo,
updateModelPickerPost,
} = createMattermostMonitorResources({
accountId: account.accountId,
callbackUrl,
client,
logger: {
debug: (message) => logger.debug?.(String(message)),
},
mediaMaxBytes,
fetchRemoteMedia: (params) => core.channel.media.fetchRemoteMedia(params),
saveMediaBuffer: (buffer, contentType, direction, maxBytes) =>
core.channel.media.saveMediaBuffer(Buffer.from(buffer), contentType, direction, maxBytes),
mediaKindFromMime: (contentType) => core.media.mediaKindFromMime(contentType) as MediaKind,
});
const runModelPickerCommand = async (params: {
commandText: string;

View File

@ -0,0 +1,373 @@
import type { Message } from "@grammyjs/types";
import { shouldDebounceTextInbound } from "openclaw/plugin-sdk/channel-runtime";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
import {
createInboundDebouncer,
resolveInboundDebounceMs,
} from "openclaw/plugin-sdk/reply-runtime";
import { danger, logVerbose, warn } from "openclaw/plugin-sdk/runtime-env";
import {
hasInboundMedia,
isRecoverableMediaGroupError,
resolveInboundMediaFileId,
} from "./bot-handlers.media.js";
import type { TelegramMediaRef } from "./bot-message-context.js";
import { MEDIA_GROUP_TIMEOUT_MS, type MediaGroupEntry } from "./bot-updates.js";
import { resolveMedia } from "./bot/delivery.js";
import type { TelegramContext } from "./bot/types.js";
import type { TelegramTransport } from "./fetch.js";
export type TelegramDebounceLane = "default" | "forward";
export type TelegramDebounceEntry = {
ctx: TelegramContext;
msg: Message;
allMedia: TelegramMediaRef[];
storeAllowFrom: string[];
debounceKey: string | null;
debounceLane: TelegramDebounceLane;
botUsername?: string;
};
export type TextFragmentEntry = {
key: string;
messages: Array<{ msg: Message; ctx: TelegramContext; receivedAtMs: number }>;
timer: ReturnType<typeof setTimeout>;
};
const DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS = 1500;
type TelegramBotApi = {
sendMessage: (
chatId: number | string,
text: string,
params?: { message_thread_id?: number },
) => Promise<unknown>;
getFile: (fileId: string) => Promise<{ file_path?: string }>;
};
export function createTelegramInboundBufferRuntime(params: {
accountId?: string | null;
bot: { api: TelegramBotApi };
cfg: OpenClawConfig;
logger: { warn: (...args: unknown[]) => void };
mediaMaxBytes: number;
opts: {
token: string;
testTimings?: {
textFragmentGapMs?: number;
mediaGroupFlushMs?: number;
};
};
processMessage: (
ctx: TelegramContext,
media: TelegramMediaRef[],
storeAllowFrom: string[],
metadata?: { messageIdOverride?: string },
replyMedia?: TelegramMediaRef[],
) => Promise<void>;
loadStoreAllowFrom: () => Promise<string[]>;
runtime: {
error?: (message: string) => void;
};
telegramTransport?: TelegramTransport;
}) {
const {
accountId,
bot,
cfg,
logger,
mediaMaxBytes,
opts,
processMessage,
loadStoreAllowFrom,
runtime,
telegramTransport,
} = params;
const TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS = 4000;
const TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS =
typeof opts.testTimings?.textFragmentGapMs === "number" &&
Number.isFinite(opts.testTimings.textFragmentGapMs)
? Math.max(10, Math.floor(opts.testTimings.textFragmentGapMs))
: DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS;
const TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP = 1;
const TELEGRAM_TEXT_FRAGMENT_MAX_PARTS = 12;
const TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS = 50_000;
const mediaGroupTimeoutMs =
typeof opts.testTimings?.mediaGroupFlushMs === "number" &&
Number.isFinite(opts.testTimings.mediaGroupFlushMs)
? Math.max(10, Math.floor(opts.testTimings.mediaGroupFlushMs))
: MEDIA_GROUP_TIMEOUT_MS;
const debounceMs = resolveInboundDebounceMs({ cfg, channel: "telegram" });
const FORWARD_BURST_DEBOUNCE_MS = 80;
const mediaGroupBuffer = new Map<string, MediaGroupEntry>();
let mediaGroupProcessing: Promise<void> = Promise.resolve();
const textFragmentBuffer = new Map<string, TextFragmentEntry>();
let textFragmentProcessing: Promise<void> = Promise.resolve();
const resolveTelegramDebounceLane = (msg: Message): TelegramDebounceLane => {
const forwardMeta = msg as {
forward_origin?: unknown;
forward_from?: unknown;
forward_from_chat?: unknown;
forward_sender_name?: unknown;
forward_date?: unknown;
};
return (forwardMeta.forward_origin ??
forwardMeta.forward_from ??
forwardMeta.forward_from_chat ??
forwardMeta.forward_sender_name ??
forwardMeta.forward_date)
? "forward"
: "default";
};
const buildSyntheticTextMessage = (params: {
base: Message;
text: string;
date?: number;
from?: Message["from"];
}): Message => ({
...params.base,
...(params.from ? { from: params.from } : {}),
text: params.text,
caption: undefined,
caption_entities: undefined,
entities: undefined,
...(params.date != null ? { date: params.date } : {}),
});
const buildSyntheticContext = (
ctx: Pick<TelegramContext, "me"> & { getFile?: unknown },
message: Message,
): TelegramContext => {
const getFile =
typeof ctx.getFile === "function"
? (ctx.getFile as TelegramContext["getFile"]).bind(ctx as object)
: async () => ({});
return { message, me: ctx.me, getFile };
};
const resolveReplyMediaForMessage = async (
ctx: TelegramContext,
msg: Message,
): Promise<TelegramMediaRef[]> => {
const replyMessage = msg.reply_to_message;
if (!replyMessage || !hasInboundMedia(replyMessage)) {
return [];
}
const replyFileId = resolveInboundMediaFileId(replyMessage);
if (!replyFileId) {
return [];
}
try {
const media = await resolveMedia(
{
message: replyMessage,
me: ctx.me,
getFile: async () => await bot.api.getFile(replyFileId),
},
mediaMaxBytes,
opts.token,
telegramTransport,
);
if (!media) {
return [];
}
return [
{
path: media.path,
contentType: media.contentType,
stickerMetadata: media.stickerMetadata,
},
];
} catch (err) {
logger.warn({ chatId: msg.chat.id, error: String(err) }, "reply media fetch failed");
return [];
}
};
const processMediaGroup = async (entry: MediaGroupEntry) => {
try {
entry.messages.sort(
(a: { msg: Message; ctx: TelegramContext }, b: { msg: Message; ctx: TelegramContext }) =>
a.msg.message_id - b.msg.message_id,
);
const captionMsg = entry.messages.find((item) => item.msg.caption || item.msg.text);
const primaryEntry = captionMsg ?? entry.messages[0];
const allMedia: TelegramMediaRef[] = [];
for (const { ctx } of entry.messages) {
let media;
try {
media = await resolveMedia(ctx, mediaMaxBytes, opts.token, telegramTransport);
} catch (mediaErr) {
if (!isRecoverableMediaGroupError(mediaErr)) {
throw mediaErr;
}
runtime.error?.(
warn(`media group: skipping photo that failed to fetch: ${String(mediaErr)}`),
);
continue;
}
if (media) {
allMedia.push({
path: media.path,
contentType: media.contentType,
stickerMetadata: media.stickerMetadata,
});
}
}
const storeAllowFrom = await loadStoreAllowFrom();
const replyMedia = await resolveReplyMediaForMessage(primaryEntry.ctx, primaryEntry.msg);
await processMessage(primaryEntry.ctx, allMedia, storeAllowFrom, undefined, replyMedia);
} catch (err) {
runtime.error?.(danger(`media group handler failed: ${String(err)}`));
}
};
const flushTextFragments = async (entry: TextFragmentEntry) => {
try {
entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id);
const first = entry.messages[0];
const last = entry.messages.at(-1);
if (!first || !last) {
return;
}
const combinedText = entry.messages.map((item) => item.msg.text ?? "").join("");
if (!combinedText.trim()) {
return;
}
const syntheticMessage = buildSyntheticTextMessage({
base: first.msg,
text: combinedText,
date: last.msg.date ?? first.msg.date,
});
const storeAllowFrom = await loadStoreAllowFrom();
await processMessage(buildSyntheticContext(first.ctx, syntheticMessage), [], storeAllowFrom, {
messageIdOverride: String(last.msg.message_id),
});
} catch (err) {
runtime.error?.(danger(`text fragment handler failed: ${String(err)}`));
}
};
const queueTextFragmentFlush = async (entry: TextFragmentEntry) => {
textFragmentProcessing = textFragmentProcessing
.then(async () => {
await flushTextFragments(entry);
})
.catch(() => undefined);
await textFragmentProcessing;
};
const runTextFragmentFlush = async (entry: TextFragmentEntry) => {
textFragmentBuffer.delete(entry.key);
await queueTextFragmentFlush(entry);
};
const scheduleTextFragmentFlush = (entry: TextFragmentEntry) => {
clearTimeout(entry.timer);
entry.timer = setTimeout(async () => {
await runTextFragmentFlush(entry);
}, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS);
};
const inboundDebouncer = createInboundDebouncer<TelegramDebounceEntry>({
debounceMs,
resolveDebounceMs: (entry) =>
entry.debounceLane === "forward" ? FORWARD_BURST_DEBOUNCE_MS : debounceMs,
buildKey: (entry) => entry.debounceKey,
shouldDebounce: (entry) => {
const text = entry.msg.text ?? entry.msg.caption ?? "";
const hasDebounceableText = shouldDebounceTextInbound({
text,
cfg,
commandOptions: { botUsername: entry.botUsername },
});
if (entry.debounceLane === "forward") {
return hasDebounceableText || entry.allMedia.length > 0;
}
return hasDebounceableText && entry.allMedia.length === 0;
},
onFlush: async (entries) => {
const last = entries.at(-1);
if (!last) {
return;
}
if (entries.length === 1) {
const replyMedia = await resolveReplyMediaForMessage(last.ctx, last.msg);
await processMessage(last.ctx, last.allMedia, last.storeAllowFrom, undefined, replyMedia);
return;
}
const combinedText = entries
.map((entry) => entry.msg.text ?? entry.msg.caption ?? "")
.filter(Boolean)
.join("\n");
const combinedMedia = entries.flatMap((entry) => entry.allMedia);
if (!combinedText.trim() && combinedMedia.length === 0) {
return;
}
const first = entries[0];
const syntheticMessage = buildSyntheticTextMessage({
base: first.msg,
text: combinedText,
date: last.msg.date ?? first.msg.date,
});
const messageIdOverride = last.msg.message_id ? String(last.msg.message_id) : undefined;
const replyMedia = await resolveReplyMediaForMessage(first.ctx, syntheticMessage);
await processMessage(
buildSyntheticContext(first.ctx, syntheticMessage),
combinedMedia,
first.storeAllowFrom,
messageIdOverride ? { messageIdOverride } : undefined,
replyMedia,
);
},
onError: (err, items) => {
runtime.error?.(danger(`telegram debounce flush failed: ${String(err)}`));
const chatId = items[0]?.msg.chat.id;
if (chatId != null) {
const threadId = items[0]?.msg.message_thread_id;
void bot.api
.sendMessage(
chatId,
"Something went wrong while processing your message. Please try again.",
threadId != null ? { message_thread_id: threadId } : undefined,
)
.catch((sendErr) => {
logVerbose(`telegram: error fallback send failed: ${String(sendErr)}`);
});
}
},
});
return {
buildSyntheticContext,
buildSyntheticTextMessage,
inboundDebouncer,
mediaGroupBuffer,
mediaGroupProcessing: () => mediaGroupProcessing,
setMediaGroupProcessing: (next: Promise<void>) => {
mediaGroupProcessing = next;
},
mediaGroupTimeoutMs,
processMediaGroup,
textFragmentBuffer,
textFragmentProcessing: () => textFragmentProcessing,
setTextFragmentProcessing: (next: Promise<void>) => {
textFragmentProcessing = next;
},
scheduleTextFragmentFlush,
flushTextFragments,
resolveReplyMediaForMessage,
resolveTelegramDebounceLane,
TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS,
TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP,
TELEGRAM_TEXT_FRAGMENT_MAX_PARTS,
TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS,
TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS,
};
}

View File

@ -1,7 +1,6 @@
import type { Message, ReactionTypeEmoji } from "@grammyjs/types";
import { resolveAgentDir, resolveDefaultAgentId } from "openclaw/plugin-sdk/agent-runtime";
import { resolveDefaultModelForAgent } from "openclaw/plugin-sdk/agent-runtime";
import { shouldDebounceTextInbound } from "openclaw/plugin-sdk/channel-runtime";
import { resolveChannelConfigWrites } from "openclaw/plugin-sdk/channel-runtime";
import { loadConfig } from "openclaw/plugin-sdk/config-runtime";
import { writeConfigFile } from "openclaw/plugin-sdk/config-runtime";
@ -26,10 +25,6 @@ import {
} from "openclaw/plugin-sdk/conversation-runtime";
import { enqueueSystemEvent } from "openclaw/plugin-sdk/infra-runtime";
import { dispatchPluginInteractiveHandler } from "openclaw/plugin-sdk/plugin-runtime";
import {
createInboundDebouncer,
resolveInboundDebounceMs,
} from "openclaw/plugin-sdk/reply-runtime";
import { buildCommandsPaginationKeyboard } from "openclaw/plugin-sdk/reply-runtime";
import {
buildModelsProviderData,
@ -47,21 +42,19 @@ import {
normalizeDmAllowFromWithStore,
type NormalizedAllowFrom,
} from "./bot-access.js";
import {
createTelegramInboundBufferRuntime,
type TextFragmentEntry,
} from "./bot-handlers.buffers.js";
import {
APPROVE_CALLBACK_DATA_RE,
hasInboundMedia,
hasReplyTargetMedia,
isMediaSizeLimitError,
isRecoverableMediaGroupError,
resolveInboundMediaFileId,
} from "./bot-handlers.media.js";
import type { TelegramMediaRef } from "./bot-message-context.js";
import { RegisterTelegramHandlerParams } from "./bot-native-commands.js";
import {
MEDIA_GROUP_TIMEOUT_MS,
type MediaGroupEntry,
type TelegramUpdateKeyContext,
} from "./bot-updates.js";
import { type TelegramUpdateKeyContext } from "./bot-updates.js";
import { resolveMedia } from "./bot/delivery.js";
import {
getTelegramTextParts,
@ -116,159 +109,41 @@ export const registerTelegramHandlers = ({
processMessage,
logger,
}: RegisterTelegramHandlerParams) => {
const DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS = 1500;
const TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS = 4000;
const TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS =
typeof opts.testTimings?.textFragmentGapMs === "number" &&
Number.isFinite(opts.testTimings.textFragmentGapMs)
? Math.max(10, Math.floor(opts.testTimings.textFragmentGapMs))
: DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS;
const TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP = 1;
const TELEGRAM_TEXT_FRAGMENT_MAX_PARTS = 12;
const TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS = 50_000;
const mediaGroupTimeoutMs =
typeof opts.testTimings?.mediaGroupFlushMs === "number" &&
Number.isFinite(opts.testTimings.mediaGroupFlushMs)
? Math.max(10, Math.floor(opts.testTimings.mediaGroupFlushMs))
: MEDIA_GROUP_TIMEOUT_MS;
const loadStoreAllowFrom = async () =>
readChannelAllowFromStore("telegram", process.env, accountId).catch(() => []);
const mediaGroupBuffer = new Map<string, MediaGroupEntry>();
let mediaGroupProcessing: Promise<void> = Promise.resolve();
type TextFragmentEntry = {
key: string;
messages: Array<{ msg: Message; ctx: TelegramContext; receivedAtMs: number }>;
timer: ReturnType<typeof setTimeout>;
};
const textFragmentBuffer = new Map<string, TextFragmentEntry>();
let textFragmentProcessing: Promise<void> = Promise.resolve();
const debounceMs = resolveInboundDebounceMs({ cfg, channel: "telegram" });
const FORWARD_BURST_DEBOUNCE_MS = 80;
type TelegramDebounceLane = "default" | "forward";
type TelegramDebounceEntry = {
ctx: TelegramContext;
msg: Message;
allMedia: TelegramMediaRef[];
storeAllowFrom: string[];
debounceKey: string | null;
debounceLane: TelegramDebounceLane;
botUsername?: string;
};
const resolveTelegramDebounceLane = (msg: Message): TelegramDebounceLane => {
const forwardMeta = msg as {
forward_origin?: unknown;
forward_from?: unknown;
forward_from_chat?: unknown;
forward_sender_name?: unknown;
forward_date?: unknown;
};
return (forwardMeta.forward_origin ??
forwardMeta.forward_from ??
forwardMeta.forward_from_chat ??
forwardMeta.forward_sender_name ??
forwardMeta.forward_date)
? "forward"
: "default";
};
const buildSyntheticTextMessage = (params: {
base: Message;
text: string;
date?: number;
from?: Message["from"];
}): Message => ({
...params.base,
...(params.from ? { from: params.from } : {}),
text: params.text,
caption: undefined,
caption_entities: undefined,
entities: undefined,
...(params.date != null ? { date: params.date } : {}),
});
const buildSyntheticContext = (
ctx: Pick<TelegramContext, "me"> & { getFile?: unknown },
message: Message,
): TelegramContext => {
const getFile =
typeof ctx.getFile === "function"
? (ctx.getFile as TelegramContext["getFile"]).bind(ctx as object)
: async () => ({});
return { message, me: ctx.me, getFile };
};
const inboundDebouncer = createInboundDebouncer<TelegramDebounceEntry>({
debounceMs,
resolveDebounceMs: (entry) =>
entry.debounceLane === "forward" ? FORWARD_BURST_DEBOUNCE_MS : debounceMs,
buildKey: (entry) => entry.debounceKey,
shouldDebounce: (entry) => {
const text = entry.msg.text ?? entry.msg.caption ?? "";
const hasDebounceableText = shouldDebounceTextInbound({
text,
cfg,
commandOptions: { botUsername: entry.botUsername },
});
if (entry.debounceLane === "forward") {
// Forwarded bursts often split text + media into adjacent updates.
// Debounce media-only forward entries too so they can coalesce.
return hasDebounceableText || entry.allMedia.length > 0;
}
if (!hasDebounceableText) {
return false;
}
return entry.allMedia.length === 0;
},
onFlush: async (entries) => {
const last = entries.at(-1);
if (!last) {
return;
}
if (entries.length === 1) {
const replyMedia = await resolveReplyMediaForMessage(last.ctx, last.msg);
await processMessage(last.ctx, last.allMedia, last.storeAllowFrom, undefined, replyMedia);
return;
}
const combinedText = entries
.map((entry) => entry.msg.text ?? entry.msg.caption ?? "")
.filter(Boolean)
.join("\n");
const combinedMedia = entries.flatMap((entry) => entry.allMedia);
if (!combinedText.trim() && combinedMedia.length === 0) {
return;
}
const first = entries[0];
const baseCtx = first.ctx;
const syntheticMessage = buildSyntheticTextMessage({
base: first.msg,
text: combinedText,
date: last.msg.date ?? first.msg.date,
});
const messageIdOverride = last.msg.message_id ? String(last.msg.message_id) : undefined;
const syntheticCtx = buildSyntheticContext(baseCtx, syntheticMessage);
const replyMedia = await resolveReplyMediaForMessage(baseCtx, syntheticMessage);
await processMessage(
syntheticCtx,
combinedMedia,
first.storeAllowFrom,
messageIdOverride ? { messageIdOverride } : undefined,
replyMedia,
);
},
onError: (err, items) => {
runtime.error?.(danger(`telegram debounce flush failed: ${String(err)}`));
const chatId = items[0]?.msg.chat.id;
if (chatId != null) {
const threadId = items[0]?.msg.message_thread_id;
void bot.api
.sendMessage(
chatId,
"Something went wrong while processing your message. Please try again.",
threadId != null ? { message_thread_id: threadId } : undefined,
)
.catch((sendErr) => {
logVerbose(`telegram: error fallback send failed: ${String(sendErr)}`);
});
}
},
const {
buildSyntheticContext,
buildSyntheticTextMessage,
inboundDebouncer,
mediaGroupBuffer,
mediaGroupProcessing,
setMediaGroupProcessing,
mediaGroupTimeoutMs,
processMediaGroup,
textFragmentBuffer,
textFragmentProcessing,
setTextFragmentProcessing,
scheduleTextFragmentFlush,
flushTextFragments,
resolveReplyMediaForMessage,
resolveTelegramDebounceLane,
TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS,
TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP,
TELEGRAM_TEXT_FRAGMENT_MAX_PARTS,
TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS,
TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS,
} = createTelegramInboundBufferRuntime({
accountId,
bot,
cfg,
logger,
mediaMaxBytes,
opts,
processMessage,
loadStoreAllowFrom,
runtime,
telegramTransport,
});
const resolveTelegramSessionState = (params: {
@ -352,139 +227,6 @@ export const registerTelegramHandlers = ({
};
};
const processMediaGroup = async (entry: MediaGroupEntry) => {
try {
entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id);
const captionMsg = entry.messages.find((m) => m.msg.caption || m.msg.text);
const primaryEntry = captionMsg ?? entry.messages[0];
const allMedia: TelegramMediaRef[] = [];
for (const { ctx } of entry.messages) {
let media;
try {
media = await resolveMedia(ctx, mediaMaxBytes, opts.token, telegramTransport);
} catch (mediaErr) {
if (!isRecoverableMediaGroupError(mediaErr)) {
throw mediaErr;
}
runtime.log?.(
warn(`media group: skipping photo that failed to fetch: ${String(mediaErr)}`),
);
continue;
}
if (media) {
allMedia.push({
path: media.path,
contentType: media.contentType,
stickerMetadata: media.stickerMetadata,
});
}
}
const storeAllowFrom = await loadStoreAllowFrom();
const replyMedia = await resolveReplyMediaForMessage(primaryEntry.ctx, primaryEntry.msg);
await processMessage(primaryEntry.ctx, allMedia, storeAllowFrom, undefined, replyMedia);
} catch (err) {
runtime.error?.(danger(`media group handler failed: ${String(err)}`));
}
};
const flushTextFragments = async (entry: TextFragmentEntry) => {
try {
entry.messages.sort((a, b) => a.msg.message_id - b.msg.message_id);
const first = entry.messages[0];
const last = entry.messages.at(-1);
if (!first || !last) {
return;
}
const combinedText = entry.messages.map((m) => m.msg.text ?? "").join("");
if (!combinedText.trim()) {
return;
}
const syntheticMessage = buildSyntheticTextMessage({
base: first.msg,
text: combinedText,
date: last.msg.date ?? first.msg.date,
});
const storeAllowFrom = await loadStoreAllowFrom();
const baseCtx = first.ctx;
await processMessage(buildSyntheticContext(baseCtx, syntheticMessage), [], storeAllowFrom, {
messageIdOverride: String(last.msg.message_id),
});
} catch (err) {
runtime.error?.(danger(`text fragment handler failed: ${String(err)}`));
}
};
const queueTextFragmentFlush = async (entry: TextFragmentEntry) => {
textFragmentProcessing = textFragmentProcessing
.then(async () => {
await flushTextFragments(entry);
})
.catch(() => undefined);
await textFragmentProcessing;
};
const runTextFragmentFlush = async (entry: TextFragmentEntry) => {
textFragmentBuffer.delete(entry.key);
await queueTextFragmentFlush(entry);
};
const scheduleTextFragmentFlush = (entry: TextFragmentEntry) => {
clearTimeout(entry.timer);
entry.timer = setTimeout(async () => {
await runTextFragmentFlush(entry);
}, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS);
};
const loadStoreAllowFrom = async () =>
readChannelAllowFromStore("telegram", process.env, accountId).catch(() => []);
const resolveReplyMediaForMessage = async (
ctx: TelegramContext,
msg: Message,
): Promise<TelegramMediaRef[]> => {
const replyMessage = msg.reply_to_message;
if (!replyMessage || !hasInboundMedia(replyMessage)) {
return [];
}
const replyFileId = resolveInboundMediaFileId(replyMessage);
if (!replyFileId) {
return [];
}
try {
const media = await resolveMedia(
{
message: replyMessage,
me: ctx.me,
getFile: async () => await bot.api.getFile(replyFileId),
},
mediaMaxBytes,
opts.token,
telegramTransport,
);
if (!media) {
return [];
}
return [
{
path: media.path,
contentType: media.contentType,
stickerMetadata: media.stickerMetadata,
},
];
} catch (err) {
logger.warn({ chatId: msg.chat.id, error: String(err) }, "reply media fetch failed");
return [];
}
};
const isAllowlistAuthorized = (
allow: NormalizedAllowFrom,
senderId: string,
@ -921,12 +663,14 @@ export const registerTelegramHandlers = ({
// Not appendable (or limits exceeded): flush buffered entry first, then continue normally.
clearTimeout(existing.timer);
textFragmentBuffer.delete(key);
textFragmentProcessing = textFragmentProcessing
.then(async () => {
await flushTextFragments(existing);
})
.catch(() => undefined);
await textFragmentProcessing;
setTextFragmentProcessing(
textFragmentProcessing()
.then(async () => {
await flushTextFragments(existing);
})
.catch(() => undefined),
);
await textFragmentProcessing();
}
const shouldStart = text.length >= TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS;
@ -951,24 +695,28 @@ export const registerTelegramHandlers = ({
existing.messages.push({ msg, ctx });
existing.timer = setTimeout(async () => {
mediaGroupBuffer.delete(mediaGroupId);
mediaGroupProcessing = mediaGroupProcessing
.then(async () => {
await processMediaGroup(existing);
})
.catch(() => undefined);
await mediaGroupProcessing;
setMediaGroupProcessing(
mediaGroupProcessing()
.then(async () => {
await processMediaGroup(existing);
})
.catch(() => undefined),
);
await mediaGroupProcessing();
}, mediaGroupTimeoutMs);
} else {
const entry: MediaGroupEntry = {
const entry = {
messages: [{ msg, ctx }],
timer: setTimeout(async () => {
mediaGroupBuffer.delete(mediaGroupId);
mediaGroupProcessing = mediaGroupProcessing
.then(async () => {
await processMediaGroup(entry);
})
.catch(() => undefined);
await mediaGroupProcessing;
setMediaGroupProcessing(
mediaGroupProcessing()
.then(async () => {
await processMediaGroup(entry);
})
.catch(() => undefined),
);
await mediaGroupProcessing();
}, mediaGroupTimeoutMs),
};
mediaGroupBuffer.set(mediaGroupId, entry);

View File

@ -1,7 +1,8 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { monitorTelegramProvider } from "./monitor.js";
import { tagTelegramNetworkError } from "./network-errors.js";
type MonitorTelegramOpts = import("./monitor.js").MonitorTelegramOpts;
type MockCtx = {
message: {
message_id?: number;
@ -136,6 +137,7 @@ function mockRunOnceAndAbort(abort: AbortController) {
}
async function expectOffsetConfirmationSkipped(offset: number | null) {
const { monitorTelegramProvider } = await import("./monitor.js");
readTelegramUpdateOffsetSpy.mockResolvedValueOnce(offset);
const abort = new AbortController();
api.getUpdates.mockReset();
@ -149,6 +151,7 @@ async function expectOffsetConfirmationSkipped(offset: number | null) {
}
async function runMonitorAndCaptureStartupOrder(params?: { persistedOffset?: number | null }) {
const { monitorTelegramProvider } = await import("./monitor.js");
if (params && "persistedOffset" in params) {
readTelegramUpdateOffsetSpy.mockResolvedValueOnce(params.persistedOffset ?? null);
}
@ -203,9 +206,8 @@ function expectRecoverableRetryState(expectedRunCalls: number) {
expect(runSpy).toHaveBeenCalledTimes(expectedRunCalls);
}
async function monitorWithAutoAbort(
opts: Omit<Parameters<typeof monitorTelegramProvider>[0], "abortSignal"> = {},
) {
async function monitorWithAutoAbort(opts: Omit<MonitorTelegramOpts, "abortSignal"> = {}) {
const { monitorTelegramProvider } = await import("./monitor.js");
const abort = new AbortController();
mockRunOnceAndAbort(abort);
await monitorTelegramProvider({
@ -215,8 +217,8 @@ async function monitorWithAutoAbort(
});
}
vi.mock("../../../src/config/config.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../../../src/config/config.js")>();
vi.mock("openclaw/plugin-sdk/config-runtime", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/config-runtime")>();
return {
...actual,
loadConfig,
@ -260,14 +262,22 @@ vi.mock("@grammyjs/runner", () => ({
run: runSpy,
}));
vi.mock("../../../src/infra/backoff.js", () => ({
computeBackoff,
sleepWithAbort,
}));
vi.mock("openclaw/plugin-sdk/infra-runtime", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/infra-runtime")>();
return {
...actual,
computeBackoff,
sleepWithAbort,
};
});
vi.mock("../../../src/infra/unhandled-rejections.js", () => ({
registerUnhandledRejectionHandler: registerUnhandledRejectionHandlerMock,
}));
vi.mock("openclaw/plugin-sdk/runtime-env", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/runtime-env")>();
return {
...actual,
registerUnhandledRejectionHandler: registerUnhandledRejectionHandlerMock,
};
});
vi.mock("./webhook.js", () => ({
startTelegramWebhook: startTelegramWebhookSpy,
@ -282,6 +292,16 @@ vi.mock("./update-offset-store.js", () => ({
writeTelegramUpdateOffset: vi.fn(async () => undefined),
}));
vi.mock("openclaw/plugin-sdk/reply-runtime", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/reply-runtime")>();
return {
...actual,
getReplyFromConfig: async (ctx: { Body?: string }) => ({
text: `echo:${ctx.Body}`,
}),
};
});
vi.mock("../../../src/auto-reply/reply.js", () => ({
getReplyFromConfig: async (ctx: { Body?: string }) => ({
text: `echo:${ctx.Body}`,
@ -292,6 +312,7 @@ describe("monitorTelegramProvider (grammY)", () => {
let consoleErrorSpy: { mockRestore: () => void } | undefined;
beforeEach(() => {
vi.resetModules();
loadConfig.mockReturnValue({
agents: { defaults: { maxConcurrent: 2 } },
channels: { telegram: {} },
@ -387,6 +408,7 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("retries on recoverable undici fetch errors", async () => {
const { monitorTelegramProvider } = await import("./monitor.js");
const abort = new AbortController();
const networkError = makeRecoverableFetchError();
runSpy
@ -410,6 +432,7 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("retries recoverable deleteWebhook failures before polling", async () => {
const { monitorTelegramProvider } = await import("./monitor.js");
const abort = new AbortController();
const cleanupError = makeRecoverableFetchError();
api.deleteWebhook.mockReset();
@ -423,6 +446,7 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("retries setup-time recoverable errors before starting polling", async () => {
const { monitorTelegramProvider } = await import("./monitor.js");
const abort = new AbortController();
const setupError = makeRecoverableFetchError();
createTelegramBotErrors.push(setupError);
@ -436,6 +460,7 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("awaits runner.stop before retrying after recoverable polling error", async () => {
const { monitorTelegramProvider } = await import("./monitor.js");
const abort = new AbortController();
const recoverableError = makeRecoverableFetchError();
let firstStopped = false;
@ -463,6 +488,7 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("stops bot instance when polling cycle exits", async () => {
const { monitorTelegramProvider } = await import("./monitor.js");
const abort = new AbortController();
mockRunOnceAndAbort(abort);
@ -473,6 +499,7 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("clears bounded cleanup timers after a clean stop", async () => {
const { monitorTelegramProvider } = await import("./monitor.js");
vi.useFakeTimers();
try {
const abort = new AbortController();
@ -487,6 +514,7 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("surfaces non-recoverable errors", async () => {
const { monitorTelegramProvider } = await import("./monitor.js");
runSpy.mockImplementationOnce(() =>
makeRunnerStub({
task: () => Promise.reject(new Error("bad token")),
@ -497,6 +525,7 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("force-restarts polling when unhandled network rejection stalls runner", async () => {
const { monitorTelegramProvider } = await import("./monitor.js");
const abort = new AbortController();
const { stop } = mockRunOnceWithStalledPollingRunner();
mockRunOnceAndAbort(abort);
@ -504,7 +533,7 @@ describe("monitorTelegramProvider (grammY)", () => {
const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
await vi.waitFor(() => expect(runSpy).toHaveBeenCalledTimes(1));
expect(emitUnhandledRejection(makeTaggedPollingFetchError())).toBe(true);
emitUnhandledRejection(makeTaggedPollingFetchError());
await monitor;
expect(stop.mock.calls.length).toBeGreaterThanOrEqual(1);
@ -514,6 +543,7 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("reuses the resolved transport across polling restarts", async () => {
const { monitorTelegramProvider } = await import("./monitor.js");
vi.useFakeTimers({ shouldAdvanceTime: true });
try {
const telegramTransport = {
@ -542,6 +572,7 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("aborts the active Telegram fetch when unhandled network rejection forces restart", async () => {
const { monitorTelegramProvider } = await import("./monitor.js");
const abort = new AbortController();
const { stop } = mockRunOnceWithStalledPollingRunner();
mockRunOnceAndAbort(abort);
@ -552,7 +583,7 @@ describe("monitorTelegramProvider (grammY)", () => {
expect(firstSignal).toBeInstanceOf(AbortSignal);
expect((firstSignal as AbortSignal).aborted).toBe(false);
expect(emitUnhandledRejection(makeTaggedPollingFetchError())).toBe(true);
emitUnhandledRejection(makeTaggedPollingFetchError());
await monitor;
expect((firstSignal as AbortSignal).aborted).toBe(true);
@ -560,6 +591,7 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("ignores unrelated process-level network errors while telegram polling is active", async () => {
const { monitorTelegramProvider } = await import("./monitor.js");
const abort = new AbortController();
const { stop } = mockRunOnceWithStalledPollingRunner();
@ -585,6 +617,7 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("passes configured webhookHost to webhook listener", async () => {
const { monitorTelegramProvider } = await import("./monitor.js");
await monitorTelegramProvider({
token: "tok",
useWebhook: true,
@ -609,6 +642,7 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("webhook mode waits for abort signal before returning", async () => {
const { monitorTelegramProvider } = await import("./monitor.js");
const abort = new AbortController();
const settled = vi.fn();
const monitor = monitorTelegramProvider({
@ -628,6 +662,7 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("force-restarts polling when getUpdates stalls (watchdog)", async () => {
const { monitorTelegramProvider } = await import("./monitor.js");
vi.useFakeTimers({ shouldAdvanceTime: true });
const abort = new AbortController();
const { stop } = mockRunOnceWithStalledPollingRunner();
@ -668,6 +703,7 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("resets webhookCleared latch on 409 conflict so deleteWebhook re-runs", async () => {
const { monitorTelegramProvider } = await import("./monitor.js");
const abort = new AbortController();
api.deleteWebhook.mockReset();
api.deleteWebhook.mockResolvedValue(true);
@ -706,6 +742,7 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("falls back to configured webhookSecret when not passed explicitly", async () => {
const { monitorTelegramProvider } = await import("./monitor.js");
await monitorTelegramProvider({
token: "tok",
useWebhook: true,

View File

@ -0,0 +1,362 @@
import type { RuntimeEnv } from "../../api.js";
import type { PendingApproval, TlonSettingsStore } from "../settings.js";
import { normalizeShip } from "../targets.js";
import { sendDm } from "../urbit/send.js";
import type { UrbitSSEClient } from "../urbit/sse-client.js";
import {
findPendingApproval,
formatApprovalConfirmation,
formatApprovalRequest,
formatBlockedList,
formatPendingList,
parseAdminCommand,
parseApprovalResponse,
removePendingApproval,
} from "./approval.js";
type TlonApprovalApi = Pick<UrbitSSEClient, "poke" | "scry">;
type ApprovedMessageProcessor = (approval: PendingApproval) => Promise<void>;
export function createTlonApprovalRuntime(params: {
api: TlonApprovalApi;
runtime: RuntimeEnv;
botShipName: string;
getPendingApprovals: () => PendingApproval[];
setPendingApprovals: (approvals: PendingApproval[]) => void;
getCurrentSettings: () => TlonSettingsStore;
setCurrentSettings: (settings: TlonSettingsStore) => void;
getEffectiveDmAllowlist: () => string[];
setEffectiveDmAllowlist: (ships: string[]) => void;
getEffectiveOwnerShip: () => string | null;
processApprovedMessage: ApprovedMessageProcessor;
refreshWatchedChannels: () => Promise<number>;
}) {
const {
api,
runtime,
botShipName,
getPendingApprovals,
setPendingApprovals,
getCurrentSettings,
setCurrentSettings,
getEffectiveDmAllowlist,
setEffectiveDmAllowlist,
getEffectiveOwnerShip,
processApprovedMessage,
refreshWatchedChannels,
} = params;
const savePendingApprovals = async (): Promise<void> => {
try {
await api.poke({
app: "settings",
mark: "settings-event",
json: {
"put-entry": {
desk: "moltbot",
"bucket-key": "tlon",
"entry-key": "pendingApprovals",
value: JSON.stringify(getPendingApprovals()),
},
},
});
} catch (err) {
runtime.error?.(`[tlon] Failed to save pending approvals: ${String(err)}`);
}
};
const addToDmAllowlist = async (ship: string): Promise<void> => {
const normalizedShip = normalizeShip(ship);
const nextAllowlist = getEffectiveDmAllowlist().includes(normalizedShip)
? getEffectiveDmAllowlist()
: [...getEffectiveDmAllowlist(), normalizedShip];
setEffectiveDmAllowlist(nextAllowlist);
try {
await api.poke({
app: "settings",
mark: "settings-event",
json: {
"put-entry": {
desk: "moltbot",
"bucket-key": "tlon",
"entry-key": "dmAllowlist",
value: nextAllowlist,
},
},
});
runtime.log?.(`[tlon] Added ${normalizedShip} to dmAllowlist`);
} catch (err) {
runtime.error?.(`[tlon] Failed to update dmAllowlist: ${String(err)}`);
}
};
const addToChannelAllowlist = async (ship: string, channelNest: string): Promise<void> => {
const normalizedShip = normalizeShip(ship);
const currentSettings = getCurrentSettings();
const channelRules = currentSettings.channelRules ?? {};
const rule = channelRules[channelNest] ?? { mode: "restricted", allowedShips: [] };
const allowedShips = [...(rule.allowedShips ?? [])];
if (!allowedShips.includes(normalizedShip)) {
allowedShips.push(normalizedShip);
}
const updatedRules = {
...channelRules,
[channelNest]: { ...rule, allowedShips },
};
setCurrentSettings({ ...currentSettings, channelRules: updatedRules });
try {
await api.poke({
app: "settings",
mark: "settings-event",
json: {
"put-entry": {
desk: "moltbot",
"bucket-key": "tlon",
"entry-key": "channelRules",
value: JSON.stringify(updatedRules),
},
},
});
runtime.log?.(`[tlon] Added ${normalizedShip} to ${channelNest} allowlist`);
} catch (err) {
runtime.error?.(`[tlon] Failed to update channelRules: ${String(err)}`);
}
};
const blockShip = async (ship: string): Promise<void> => {
const normalizedShip = normalizeShip(ship);
try {
await api.poke({
app: "chat",
mark: "chat-block-ship",
json: { ship: normalizedShip },
});
runtime.log?.(`[tlon] Blocked ship ${normalizedShip}`);
} catch (err) {
runtime.error?.(`[tlon] Failed to block ship ${normalizedShip}: ${String(err)}`);
}
};
const isShipBlocked = async (ship: string): Promise<boolean> => {
const normalizedShip = normalizeShip(ship);
try {
const blocked = (await api.scry("/chat/blocked.json")) as string[] | undefined;
return (
Array.isArray(blocked) && blocked.some((item) => normalizeShip(item) === normalizedShip)
);
} catch (err) {
runtime.log?.(`[tlon] Failed to check blocked list: ${String(err)}`);
return false;
}
};
const getBlockedShips = async (): Promise<string[]> => {
try {
const blocked = (await api.scry("/chat/blocked.json")) as string[] | undefined;
return Array.isArray(blocked) ? blocked : [];
} catch (err) {
runtime.log?.(`[tlon] Failed to get blocked list: ${String(err)}`);
return [];
}
};
const unblockShip = async (ship: string): Promise<boolean> => {
const normalizedShip = normalizeShip(ship);
try {
await api.poke({
app: "chat",
mark: "chat-unblock-ship",
json: { ship: normalizedShip },
});
runtime.log?.(`[tlon] Unblocked ship ${normalizedShip}`);
return true;
} catch (err) {
runtime.error?.(`[tlon] Failed to unblock ship ${normalizedShip}: ${String(err)}`);
return false;
}
};
const sendOwnerNotification = async (message: string): Promise<void> => {
const ownerShip = getEffectiveOwnerShip();
if (!ownerShip) {
runtime.log?.("[tlon] No ownerShip configured, cannot send notification");
return;
}
try {
await sendDm({
api,
fromShip: botShipName,
toShip: ownerShip,
text: message,
});
runtime.log?.(`[tlon] Sent notification to owner ${ownerShip}`);
} catch (err) {
runtime.error?.(`[tlon] Failed to send notification to owner: ${String(err)}`);
}
};
const queueApprovalRequest = async (approval: PendingApproval): Promise<void> => {
if (await isShipBlocked(approval.requestingShip)) {
runtime.log?.(`[tlon] Ignoring request from blocked ship ${approval.requestingShip}`);
return;
}
const approvals = getPendingApprovals();
const existingIndex = approvals.findIndex(
(item) =>
item.type === approval.type &&
item.requestingShip === approval.requestingShip &&
(approval.type !== "channel" || item.channelNest === approval.channelNest) &&
(approval.type !== "group" || item.groupFlag === approval.groupFlag),
);
if (existingIndex !== -1) {
const existing = approvals[existingIndex];
if (approval.originalMessage) {
existing.originalMessage = approval.originalMessage;
existing.messagePreview = approval.messagePreview;
}
runtime.log?.(
`[tlon] Updated existing approval for ${approval.requestingShip} (${approval.type}) - re-sending notification`,
);
await savePendingApprovals();
await sendOwnerNotification(formatApprovalRequest(existing));
return;
}
setPendingApprovals([...approvals, approval]);
await savePendingApprovals();
await sendOwnerNotification(formatApprovalRequest(approval));
runtime.log?.(
`[tlon] Queued approval request: ${approval.id} (${approval.type} from ${approval.requestingShip})`,
);
};
const handleApprovalResponse = async (text: string): Promise<boolean> => {
const parsed = parseApprovalResponse(text);
if (!parsed) {
return false;
}
const approval = findPendingApproval(getPendingApprovals(), parsed.id);
if (!approval) {
await sendOwnerNotification(
`No pending approval found${parsed.id ? ` for ID: ${parsed.id}` : ""}`,
);
return true;
}
if (parsed.action === "approve") {
switch (approval.type) {
case "dm":
await addToDmAllowlist(approval.requestingShip);
if (approval.originalMessage) {
runtime.log?.(
`[tlon] Processing original message from ${approval.requestingShip} after approval`,
);
await processApprovedMessage(approval);
}
break;
case "channel":
if (approval.channelNest) {
await addToChannelAllowlist(approval.requestingShip, approval.channelNest);
if (approval.originalMessage) {
runtime.log?.(
`[tlon] Processing original message from ${approval.requestingShip} in ${approval.channelNest} after approval`,
);
await processApprovedMessage(approval);
}
}
break;
case "group":
if (approval.groupFlag) {
try {
await api.poke({
app: "groups",
mark: "group-join",
json: {
flag: approval.groupFlag,
"join-all": true,
},
});
runtime.log?.(`[tlon] Joined group ${approval.groupFlag} after approval`);
setTimeout(() => {
void (async () => {
try {
const newCount = await refreshWatchedChannels();
if (newCount > 0) {
runtime.log?.(
`[tlon] Discovered ${newCount} new channel(s) after joining group`,
);
}
} catch (err) {
runtime.log?.(
`[tlon] Channel discovery after group join failed: ${String(err)}`,
);
}
})();
}, 2000);
} catch (err) {
runtime.error?.(`[tlon] Failed to join group ${approval.groupFlag}: ${String(err)}`);
}
}
break;
}
await sendOwnerNotification(formatApprovalConfirmation(approval, "approve"));
} else if (parsed.action === "block") {
await blockShip(approval.requestingShip);
await sendOwnerNotification(formatApprovalConfirmation(approval, "block"));
} else {
await sendOwnerNotification(formatApprovalConfirmation(approval, "deny"));
}
setPendingApprovals(removePendingApproval(getPendingApprovals(), approval.id));
await savePendingApprovals();
return true;
};
const handleAdminCommand = async (text: string): Promise<boolean> => {
const command = parseAdminCommand(text);
if (!command) {
return false;
}
switch (command.type) {
case "blocked": {
const blockedShips = await getBlockedShips();
await sendOwnerNotification(formatBlockedList(blockedShips));
runtime.log?.(`[tlon] Owner requested blocked ships list (${blockedShips.length} ships)`);
return true;
}
case "pending":
await sendOwnerNotification(formatPendingList(getPendingApprovals()));
runtime.log?.(
`[tlon] Owner requested pending approvals list (${getPendingApprovals().length} pending)`,
);
return true;
case "unblock": {
const shipToUnblock = command.ship;
if (!(await isShipBlocked(shipToUnblock))) {
await sendOwnerNotification(`${shipToUnblock} is not blocked.`);
return true;
}
const success = await unblockShip(shipToUnblock);
await sendOwnerNotification(
success ? `Unblocked ${shipToUnblock}.` : `Failed to unblock ${shipToUnblock}.`,
);
return true;
}
}
};
return {
queueApprovalRequest,
handleApprovalResponse,
handleAdminCommand,
};
}

View File

@ -0,0 +1,53 @@
import type { RuntimeEnv } from "../../api.js";
import { extractCites, extractMessageText, type ParsedCite } from "./utils.js";
type TlonScryApi = {
scry: (path: string) => Promise<unknown>;
};
export function createTlonCitationResolver(params: { api: TlonScryApi; runtime: RuntimeEnv }) {
const { api, runtime } = params;
const resolveCiteContent = async (cite: ParsedCite): Promise<string | null> => {
if (cite.type !== "chan" || !cite.nest || !cite.postId) {
return null;
}
try {
const scryPath = `/channels/v4/${cite.nest}/posts/post/${cite.postId}.json`;
runtime.log?.(`[tlon] Fetching cited post: ${scryPath}`);
const data: any = await api.scry(scryPath);
if (data?.essay?.content) {
return extractMessageText(data.essay.content) || null;
}
return null;
} catch (err) {
runtime.log?.(`[tlon] Failed to fetch cited post: ${String(err)}`);
return null;
}
};
const resolveAllCites = async (content: unknown): Promise<string> => {
const cites = extractCites(content);
if (cites.length === 0) {
return "";
}
const resolved: string[] = [];
for (const cite of cites) {
const text = await resolveCiteContent(cite);
if (text) {
resolved.push(`> ${cite.author || "unknown"} wrote: ${text}`);
}
}
return resolved.length > 0 ? `${resolved.join("\n")}\n\n` : "";
};
return {
resolveCiteContent,
resolveAllCites,
};
}

View File

@ -9,22 +9,16 @@ import { ssrfPolicyFromAllowPrivateNetwork } from "../urbit/context.js";
import type { Foreigns, DmInvite } from "../urbit/foreigns.js";
import { sendDm, sendGroupMessage } from "../urbit/send.js";
import { UrbitSSEClient } from "../urbit/sse-client.js";
import { createTlonApprovalRuntime } from "./approval-runtime.js";
import {
type PendingApproval,
type AdminCommand,
createPendingApproval,
formatApprovalRequest,
formatApprovalConfirmation,
parseApprovalResponse,
isApprovalResponse,
findPendingApproval,
removePendingApproval,
parseAdminCommand,
isAdminCommand,
formatBlockedList,
formatPendingList,
} from "./approval.js";
import { resolveChannelAuthorization } from "./authorization.js";
import { createTlonCitationResolver } from "./cites.js";
import { fetchAllChannels, fetchInitData } from "./discovery.js";
import { cacheMessage, getChannelHistory, fetchThreadHistory } from "./history.js";
import { downloadMessageImages } from "./media.js";
@ -270,412 +264,6 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise<v
runtime.log?.("[tlon] No group channels to monitor (DMs only)");
}
// Helper to resolve cited message content
async function resolveCiteContent(cite: ParsedCite): Promise<string | null> {
if (cite.type !== "chan" || !cite.nest || !cite.postId) {
return null;
}
try {
// Scry for the specific post: /v4/{nest}/posts/post/{postId}
const scryPath = `/channels/v4/${cite.nest}/posts/post/${cite.postId}.json`;
runtime.log?.(`[tlon] Fetching cited post: ${scryPath}`);
const data: any = await api!.scry(scryPath);
// Extract text from the post's essay content
if (data?.essay?.content) {
const text = extractMessageText(data.essay.content);
return text || null;
}
return null;
} catch (err) {
runtime.log?.(`[tlon] Failed to fetch cited post: ${String(err)}`);
return null;
}
}
// Resolve all cites in message content and return quoted text
async function resolveAllCites(content: unknown): Promise<string> {
const cites = extractCites(content);
if (cites.length === 0) {
return "";
}
const resolved: string[] = [];
for (const cite of cites) {
const text = await resolveCiteContent(cite);
if (text) {
const author = cite.author || "unknown";
resolved.push(`> ${author} wrote: ${text}`);
}
}
return resolved.length > 0 ? resolved.join("\n") + "\n\n" : "";
}
// Helper to save pending approvals to settings store
async function savePendingApprovals(): Promise<void> {
try {
await api!.poke({
app: "settings",
mark: "settings-event",
json: {
"put-entry": {
desk: "moltbot",
"bucket-key": "tlon",
"entry-key": "pendingApprovals",
value: JSON.stringify(pendingApprovals),
},
},
});
} catch (err) {
runtime.error?.(`[tlon] Failed to save pending approvals: ${String(err)}`);
}
}
// Helper to update dmAllowlist in settings store
async function addToDmAllowlist(ship: string): Promise<void> {
const normalizedShip = normalizeShip(ship);
if (!effectiveDmAllowlist.includes(normalizedShip)) {
effectiveDmAllowlist = [...effectiveDmAllowlist, normalizedShip];
}
try {
await api!.poke({
app: "settings",
mark: "settings-event",
json: {
"put-entry": {
desk: "moltbot",
"bucket-key": "tlon",
"entry-key": "dmAllowlist",
value: effectiveDmAllowlist,
},
},
});
runtime.log?.(`[tlon] Added ${normalizedShip} to dmAllowlist`);
} catch (err) {
runtime.error?.(`[tlon] Failed to update dmAllowlist: ${String(err)}`);
}
}
// Helper to update channelRules in settings store
async function addToChannelAllowlist(ship: string, channelNest: string): Promise<void> {
const normalizedShip = normalizeShip(ship);
const channelRules = currentSettings.channelRules ?? {};
const rule = channelRules[channelNest] ?? { mode: "restricted", allowedShips: [] };
const allowedShips = [...(rule.allowedShips ?? [])]; // Clone to avoid mutation
if (!allowedShips.includes(normalizedShip)) {
allowedShips.push(normalizedShip);
}
const updatedRules = {
...channelRules,
[channelNest]: { ...rule, allowedShips },
};
// Update local state immediately (don't wait for settings subscription)
currentSettings = { ...currentSettings, channelRules: updatedRules };
try {
await api!.poke({
app: "settings",
mark: "settings-event",
json: {
"put-entry": {
desk: "moltbot",
"bucket-key": "tlon",
"entry-key": "channelRules",
value: JSON.stringify(updatedRules),
},
},
});
runtime.log?.(`[tlon] Added ${normalizedShip} to ${channelNest} allowlist`);
} catch (err) {
runtime.error?.(`[tlon] Failed to update channelRules: ${String(err)}`);
}
}
// Helper to block a ship using Tlon's native blocking
async function blockShip(ship: string): Promise<void> {
const normalizedShip = normalizeShip(ship);
try {
await api!.poke({
app: "chat",
mark: "chat-block-ship",
json: { ship: normalizedShip },
});
runtime.log?.(`[tlon] Blocked ship ${normalizedShip}`);
} catch (err) {
runtime.error?.(`[tlon] Failed to block ship ${normalizedShip}: ${String(err)}`);
}
}
// Check if a ship is blocked using Tlon's native block list
async function isShipBlocked(ship: string): Promise<boolean> {
const normalizedShip = normalizeShip(ship);
try {
const blocked = (await api!.scry("/chat/blocked.json")) as string[] | undefined;
return Array.isArray(blocked) && blocked.some((s) => normalizeShip(s) === normalizedShip);
} catch (err) {
runtime.log?.(`[tlon] Failed to check blocked list: ${String(err)}`);
return false;
}
}
// Get all blocked ships
async function getBlockedShips(): Promise<string[]> {
try {
const blocked = (await api!.scry("/chat/blocked.json")) as string[] | undefined;
return Array.isArray(blocked) ? blocked : [];
} catch (err) {
runtime.log?.(`[tlon] Failed to get blocked list: ${String(err)}`);
return [];
}
}
// Helper to unblock a ship using Tlon's native blocking
async function unblockShip(ship: string): Promise<boolean> {
const normalizedShip = normalizeShip(ship);
try {
await api!.poke({
app: "chat",
mark: "chat-unblock-ship",
json: { ship: normalizedShip },
});
runtime.log?.(`[tlon] Unblocked ship ${normalizedShip}`);
return true;
} catch (err) {
runtime.error?.(`[tlon] Failed to unblock ship ${normalizedShip}: ${String(err)}`);
return false;
}
}
// Helper to send DM notification to owner
async function sendOwnerNotification(message: string): Promise<void> {
if (!effectiveOwnerShip) {
runtime.log?.("[tlon] No ownerShip configured, cannot send notification");
return;
}
try {
await sendDm({
api: api!,
fromShip: botShipName,
toShip: effectiveOwnerShip,
text: message,
});
runtime.log?.(`[tlon] Sent notification to owner ${effectiveOwnerShip}`);
} catch (err) {
runtime.error?.(`[tlon] Failed to send notification to owner: ${String(err)}`);
}
}
// Queue a new approval request and notify the owner
async function queueApprovalRequest(approval: PendingApproval): Promise<void> {
// Check if ship is blocked - silently ignore
if (await isShipBlocked(approval.requestingShip)) {
runtime.log?.(`[tlon] Ignoring request from blocked ship ${approval.requestingShip}`);
return;
}
// Check for duplicate - if found, update it with new content and re-notify
const existingIndex = pendingApprovals.findIndex(
(a) =>
a.type === approval.type &&
a.requestingShip === approval.requestingShip &&
(approval.type !== "channel" || a.channelNest === approval.channelNest) &&
(approval.type !== "group" || a.groupFlag === approval.groupFlag),
);
if (existingIndex !== -1) {
// Update existing approval with new content (preserves the original ID)
const existing = pendingApprovals[existingIndex];
if (approval.originalMessage) {
existing.originalMessage = approval.originalMessage;
existing.messagePreview = approval.messagePreview;
}
runtime.log?.(
`[tlon] Updated existing approval for ${approval.requestingShip} (${approval.type}) - re-sending notification`,
);
await savePendingApprovals();
const message = formatApprovalRequest(existing);
await sendOwnerNotification(message);
return;
}
pendingApprovals.push(approval);
await savePendingApprovals();
const message = formatApprovalRequest(approval);
await sendOwnerNotification(message);
runtime.log?.(
`[tlon] Queued approval request: ${approval.id} (${approval.type} from ${approval.requestingShip})`,
);
}
// Process the owner's approval response
async function handleApprovalResponse(text: string): Promise<boolean> {
const parsed = parseApprovalResponse(text);
if (!parsed) {
return false;
}
const approval = findPendingApproval(pendingApprovals, parsed.id);
if (!approval) {
await sendOwnerNotification(
"No pending approval found" + (parsed.id ? ` for ID: ${parsed.id}` : ""),
);
return true; // Still consumed the message
}
if (parsed.action === "approve") {
switch (approval.type) {
case "dm":
await addToDmAllowlist(approval.requestingShip);
// Process the original message if available
if (approval.originalMessage) {
runtime.log?.(
`[tlon] Processing original message from ${approval.requestingShip} after approval`,
);
await processMessage({
messageId: approval.originalMessage.messageId,
senderShip: approval.requestingShip,
messageText: approval.originalMessage.messageText,
messageContent: approval.originalMessage.messageContent,
isGroup: false,
timestamp: approval.originalMessage.timestamp,
});
}
break;
case "channel":
if (approval.channelNest) {
await addToChannelAllowlist(approval.requestingShip, approval.channelNest);
// Process the original message if available
if (approval.originalMessage) {
const parsed = parseChannelNest(approval.channelNest);
runtime.log?.(
`[tlon] Processing original message from ${approval.requestingShip} in ${approval.channelNest} after approval`,
);
await processMessage({
messageId: approval.originalMessage.messageId,
senderShip: approval.requestingShip,
messageText: approval.originalMessage.messageText,
messageContent: approval.originalMessage.messageContent,
isGroup: true,
channelNest: approval.channelNest,
hostShip: parsed?.hostShip,
channelName: parsed?.channelName,
timestamp: approval.originalMessage.timestamp,
parentId: approval.originalMessage.parentId,
isThreadReply: approval.originalMessage.isThreadReply,
});
}
}
break;
case "group":
// Accept the group invite (don't add to allowlist - each invite requires approval)
if (approval.groupFlag) {
try {
await api!.poke({
app: "groups",
mark: "group-join",
json: {
flag: approval.groupFlag,
"join-all": true,
},
});
runtime.log?.(`[tlon] Joined group ${approval.groupFlag} after approval`);
// Immediately discover channels from the newly joined group
// Small delay to allow the join to propagate
setTimeout(async () => {
try {
const discoveredChannels = await fetchAllChannels(api!, runtime);
let newCount = 0;
for (const channelNest of discoveredChannels) {
if (!watchedChannels.has(channelNest)) {
watchedChannels.add(channelNest);
newCount++;
}
}
if (newCount > 0) {
runtime.log?.(
`[tlon] Discovered ${newCount} new channel(s) after joining group`,
);
}
} catch (err) {
runtime.log?.(`[tlon] Channel discovery after group join failed: ${String(err)}`);
}
}, 2000);
} catch (err) {
runtime.error?.(`[tlon] Failed to join group ${approval.groupFlag}: ${String(err)}`);
}
}
break;
}
await sendOwnerNotification(formatApprovalConfirmation(approval, "approve"));
} else if (parsed.action === "block") {
// Block the ship using Tlon's native blocking
await blockShip(approval.requestingShip);
await sendOwnerNotification(formatApprovalConfirmation(approval, "block"));
} else {
// Denied - just remove from pending, no notification to requester
await sendOwnerNotification(formatApprovalConfirmation(approval, "deny"));
}
// Remove from pending
pendingApprovals = removePendingApproval(pendingApprovals, approval.id);
await savePendingApprovals();
return true;
}
// Handle admin commands from owner (unblock, blocked, pending)
async function handleAdminCommand(text: string): Promise<boolean> {
const command = parseAdminCommand(text);
if (!command) {
return false;
}
switch (command.type) {
case "blocked": {
const blockedShips = await getBlockedShips();
await sendOwnerNotification(formatBlockedList(blockedShips));
runtime.log?.(`[tlon] Owner requested blocked ships list (${blockedShips.length} ships)`);
return true;
}
case "pending": {
await sendOwnerNotification(formatPendingList(pendingApprovals));
runtime.log?.(
`[tlon] Owner requested pending approvals list (${pendingApprovals.length} pending)`,
);
return true;
}
case "unblock": {
const shipToUnblock = command.ship;
const isBlocked = await isShipBlocked(shipToUnblock);
if (!isBlocked) {
await sendOwnerNotification(`${shipToUnblock} is not blocked.`);
return true;
}
const success = await unblockShip(shipToUnblock);
if (success) {
await sendOwnerNotification(`Unblocked ${shipToUnblock}.`);
} else {
await sendOwnerNotification(`Failed to unblock ${shipToUnblock}.`);
}
return true;
}
}
}
// Check if a ship is the owner (always allowed to DM)
function isOwner(ship: string): boolean {
if (!effectiveOwnerShip) {
@ -1026,6 +614,79 @@ export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise<v
const watchedChannels = new Set<string>(groupChannels);
const _watchedDMs = new Set<string>();
const refreshWatchedChannels = async (): Promise<number> => {
const discoveredChannels = await fetchAllChannels(api!, runtime);
let newCount = 0;
for (const channelNest of discoveredChannels) {
if (!watchedChannels.has(channelNest)) {
watchedChannels.add(channelNest);
newCount++;
}
}
return newCount;
};
const { resolveAllCites } = createTlonCitationResolver({
api: { scry: (path) => api!.scry(path) },
runtime,
});
const { queueApprovalRequest, handleApprovalResponse, handleAdminCommand } =
createTlonApprovalRuntime({
api: {
poke: (payload) => api!.poke(payload),
scry: (path) => api!.scry(path),
},
runtime,
botShipName,
getPendingApprovals: () => pendingApprovals,
setPendingApprovals: (approvals) => {
pendingApprovals = approvals;
},
getCurrentSettings: () => currentSettings,
setCurrentSettings: (settings) => {
currentSettings = settings;
},
getEffectiveDmAllowlist: () => effectiveDmAllowlist,
setEffectiveDmAllowlist: (ships) => {
effectiveDmAllowlist = ships;
},
getEffectiveOwnerShip: () => effectiveOwnerShip,
processApprovedMessage: async (approval) => {
if (!approval.originalMessage) {
return;
}
if (approval.type === "dm") {
await processMessage({
messageId: approval.originalMessage.messageId,
senderShip: approval.requestingShip,
messageText: approval.originalMessage.messageText,
messageContent: approval.originalMessage.messageContent,
isGroup: false,
timestamp: approval.originalMessage.timestamp,
});
return;
}
if (approval.type === "channel" && approval.channelNest) {
const parsedChannel = parseChannelNest(approval.channelNest);
await processMessage({
messageId: approval.originalMessage.messageId,
senderShip: approval.requestingShip,
messageText: approval.originalMessage.messageText,
messageContent: approval.originalMessage.messageContent,
isGroup: true,
channelNest: approval.channelNest,
hostShip: parsedChannel?.hostShip,
channelName: parsedChannel?.channelName,
timestamp: approval.originalMessage.timestamp,
parentId: approval.originalMessage.parentId,
isThreadReply: approval.originalMessage.isThreadReply,
});
}
},
refreshWatchedChannels,
});
// Firehose handler for all channel messages (/v2)
const handleChannelsFirehose = async (event: any) => {
try {

View File

@ -80,10 +80,8 @@ vi.mock("../plugins/provider-runtime.js", async (importOriginal) => {
const thinkingLevel = skipReasoningInjection ? undefined : params.context.thinkingLevel;
return createOpenRouterSystemCacheWrapper(createOpenRouterWrapper(streamFn, thinkingLevel));
},
resolveProviderCapabilitiesWithPlugin: (params: {
provider: string;
workspaceDir?: string;
}) => resolveProviderCapabilitiesWithPluginMock(params),
resolveProviderCapabilitiesWithPlugin: (params: { provider: string; workspaceDir?: string }) =>
resolveProviderCapabilitiesWithPluginMock(params),
};
});

View File

@ -89,11 +89,14 @@ function hasOpenAiAnthropicToolPayloadCompatFlag(model: { compat?: unknown }): b
);
}
function requiresAnthropicToolPayloadCompatibilityForModel(model: {
api?: unknown;
provider?: unknown;
compat?: unknown;
}, options?: AnthropicToolPayloadResolverOptions): boolean {
function requiresAnthropicToolPayloadCompatibilityForModel(
model: {
api?: unknown;
provider?: unknown;
compat?: unknown;
},
options?: AnthropicToolPayloadResolverOptions,
): boolean {
if (model.api !== "anthropic-messages") {
return false;
}
@ -107,10 +110,13 @@ function requiresAnthropicToolPayloadCompatibilityForModel(model: {
return hasOpenAiAnthropicToolPayloadCompatFlag(model);
}
function usesOpenAiFunctionAnthropicToolSchemaForModel(model: {
provider?: unknown;
compat?: unknown;
}, options?: AnthropicToolPayloadResolverOptions): boolean {
function usesOpenAiFunctionAnthropicToolSchemaForModel(
model: {
provider?: unknown;
compat?: unknown;
},
options?: AnthropicToolPayloadResolverOptions,
): boolean {
if (
typeof model.provider === "string" &&
usesOpenAiFunctionAnthropicToolSchema(model.provider, options)
@ -120,10 +126,13 @@ function usesOpenAiFunctionAnthropicToolSchemaForModel(model: {
return hasOpenAiAnthropicToolPayloadCompatFlag(model);
}
function usesOpenAiStringModeAnthropicToolChoiceForModel(model: {
provider?: unknown;
compat?: unknown;
}, options?: AnthropicToolPayloadResolverOptions): boolean {
function usesOpenAiStringModeAnthropicToolChoiceForModel(
model: {
provider?: unknown;
compat?: unknown;
},
options?: AnthropicToolPayloadResolverOptions,
): boolean {
if (
typeof model.provider === "string" &&
usesOpenAiStringModeAnthropicToolChoice(model.provider, options)

View File

@ -1,6 +1,6 @@
import type { OpenClawConfig } from "../config/config.js";
import { resolveProviderCapabilitiesWithPlugin } from "../plugins/provider-runtime.js";
import { normalizeProviderId } from "./model-selection.js";
import type { OpenClawConfig } from "../config/config.js";
export type ProviderCapabilities = {
anthropicToolSchemaMode: "native" | "openai-functions";
@ -125,8 +125,7 @@ export function usesOpenAiStringModeAnthropicToolChoice(
options?: ProviderCapabilityLookupOptions,
): boolean {
return (
resolveProviderCapabilities(provider, options).anthropicToolChoiceMode ===
"openai-string-modes"
resolveProviderCapabilities(provider, options).anthropicToolChoiceMode === "openai-string-modes"
);
}

View File

@ -56,6 +56,10 @@ function normalizeTrimmedSet(
.filter((id): id is string => Boolean(id));
}
function objectValues<T>(value: Record<string, T> | undefined): T[] {
return Object.values(value ?? {});
}
export async function listSlackDirectoryPeersFromConfig(
params: DirectoryConfigParams,
): Promise<ChannelDirectoryEntry[]> {
@ -123,9 +127,9 @@ export async function listDiscordDirectoryPeersFromConfig(
account.config.allowFrom ?? account.config.dm?.allowFrom,
account.config.dms,
);
for (const guild of Object.values(account.config.guilds ?? {})) {
for (const guild of objectValues(account.config.guilds)) {
addTrimmedEntries(ids, guild.users ?? []);
for (const channel of Object.values(guild.channels ?? {})) {
for (const channel of objectValues(guild.channels)) {
addTrimmedEntries(ids, channel.users ?? []);
}
}
@ -153,7 +157,7 @@ export async function listDiscordDirectoryGroupsFromConfig(
return [];
}
const ids = new Set<string>();
for (const guild of Object.values(account.config.guilds ?? {})) {
for (const guild of objectValues(account.config.guilds)) {
addTrimmedEntries(ids, Object.keys(guild.channels ?? {}));
}

View File

@ -99,7 +99,8 @@ function readExportStatements(path: string): string[] {
return sourceFile.statements.flatMap((statement) => {
if (!ts.isExportDeclaration(statement)) {
if (!statement.modifiers?.some((modifier) => modifier.kind === ts.SyntaxKind.ExportKeyword)) {
const modifiers = ts.canHaveModifiers(statement) ? ts.getModifiers(statement) : undefined;
if (!modifiers?.some((modifier) => modifier.kind === ts.SyntaxKind.ExportKeyword)) {
return [];
}
return [statement.getText(sourceFile).replaceAll(/\s+/g, " ").trim()];

View File

@ -248,15 +248,16 @@ export const baseConfig = (): OpenClawConfig =>
channels: {
discord: {
accounts: {
default: {},
default: {
token: "MTIz.abc.def",
},
},
},
},
}) as OpenClawConfig;
vi.mock("@buape/carbon", () => {
class Command {}
class ReadyListener {}
vi.mock("@buape/carbon", async (importOriginal) => {
const actual = await importOriginal<typeof import("@buape/carbon")>();
class RateLimitError extends Error {
status = 429;
discordCode?: number;
@ -293,7 +294,7 @@ vi.mock("@buape/carbon", () => {
return clientGetPluginMock(name);
}
}
return { Client, Command, RateLimitError, ReadyListener };
return { ...actual, Client, RateLimitError };
});
vi.mock("@buape/carbon/gateway", () => ({
@ -463,7 +464,9 @@ vi.mock("../../../extensions/discord/src/monitor/provider.lifecycle.js", () => (
}));
vi.mock("../../../extensions/discord/src/monitor/rest-fetch.js", () => ({
resolveDiscordRestFetch: () => async () => undefined,
resolveDiscordRestFetch: () => async () => {
throw new Error("offline");
},
}));
vi.mock("../../../extensions/discord/src/monitor/thread-bindings.js", () => ({