openclaw/src/infra/outbound/session-binding-service.ts

333 lines
10 KiB
TypeScript

import { normalizeAccountId } from "../../routing/session-key.js";
export type BindingTargetKind = "subagent" | "session";
export type BindingStatus = "active" | "ending" | "ended";
export type SessionBindingPlacement = "current" | "child";
export type SessionBindingErrorCode =
| "BINDING_ADAPTER_UNAVAILABLE"
| "BINDING_CAPABILITY_UNSUPPORTED"
| "BINDING_CREATE_FAILED";
export type ConversationRef = {
channel: string;
accountId: string;
conversationId: string;
parentConversationId?: string;
};
export type SessionBindingRecord = {
bindingId: string;
targetSessionKey: string;
targetKind: BindingTargetKind;
conversation: ConversationRef;
status: BindingStatus;
boundAt: number;
expiresAt?: number;
metadata?: Record<string, unknown>;
};
export type SessionBindingBindInput = {
targetSessionKey: string;
targetKind: BindingTargetKind;
conversation: ConversationRef;
placement?: SessionBindingPlacement;
metadata?: Record<string, unknown>;
ttlMs?: number;
};
export type SessionBindingUnbindInput = {
bindingId?: string;
targetSessionKey?: string;
reason: string;
};
export type SessionBindingCapabilities = {
adapterAvailable: boolean;
bindSupported: boolean;
unbindSupported: boolean;
placements: SessionBindingPlacement[];
};
export class SessionBindingError extends Error {
constructor(
public readonly code: SessionBindingErrorCode,
message: string,
public readonly details?: {
channel?: string;
accountId?: string;
placement?: SessionBindingPlacement;
},
) {
super(message);
this.name = "SessionBindingError";
}
}
export function isSessionBindingError(error: unknown): error is SessionBindingError {
return error instanceof SessionBindingError;
}
export type SessionBindingService = {
bind: (input: SessionBindingBindInput) => Promise<SessionBindingRecord>;
getCapabilities: (params: { channel: string; accountId: string }) => SessionBindingCapabilities;
listBySession: (targetSessionKey: string) => SessionBindingRecord[];
resolveByConversation: (ref: ConversationRef) => SessionBindingRecord | null;
touch: (bindingId: string, at?: number) => void;
unbind: (input: SessionBindingUnbindInput) => Promise<SessionBindingRecord[]>;
};
export type SessionBindingAdapterCapabilities = {
placements?: SessionBindingPlacement[];
bindSupported?: boolean;
unbindSupported?: boolean;
};
export type SessionBindingAdapter = {
channel: string;
accountId: string;
capabilities?: SessionBindingAdapterCapabilities;
bind?: (input: SessionBindingBindInput) => Promise<SessionBindingRecord | null>;
listBySession: (targetSessionKey: string) => SessionBindingRecord[];
resolveByConversation: (ref: ConversationRef) => SessionBindingRecord | null;
touch?: (bindingId: string, at?: number) => void;
unbind?: (input: SessionBindingUnbindInput) => Promise<SessionBindingRecord[]>;
};
function normalizeConversationRef(ref: ConversationRef): ConversationRef {
return {
channel: ref.channel.trim().toLowerCase(),
accountId: normalizeAccountId(ref.accountId),
conversationId: ref.conversationId.trim(),
parentConversationId: ref.parentConversationId?.trim() || undefined,
};
}
function toAdapterKey(params: { channel: string; accountId: string }): string {
return `${params.channel.trim().toLowerCase()}:${normalizeAccountId(params.accountId)}`;
}
function normalizePlacement(raw: unknown): SessionBindingPlacement | undefined {
return raw === "current" || raw === "child" ? raw : undefined;
}
function inferDefaultPlacement(ref: ConversationRef): SessionBindingPlacement {
return ref.conversationId ? "current" : "child";
}
function resolveAdapterPlacements(adapter: SessionBindingAdapter): SessionBindingPlacement[] {
const configured = adapter.capabilities?.placements?.map((value) => normalizePlacement(value));
const placements = configured?.filter((value): value is SessionBindingPlacement =>
Boolean(value),
);
if (placements && placements.length > 0) {
return [...new Set(placements)];
}
return ["current", "child"];
}
function resolveAdapterCapabilities(
adapter: SessionBindingAdapter | null,
): SessionBindingCapabilities {
if (!adapter) {
return {
adapterAvailable: false,
bindSupported: false,
unbindSupported: false,
placements: [],
};
}
const bindSupported = adapter.capabilities?.bindSupported ?? Boolean(adapter.bind);
return {
adapterAvailable: true,
bindSupported,
unbindSupported: adapter.capabilities?.unbindSupported ?? Boolean(adapter.unbind),
placements: bindSupported ? resolveAdapterPlacements(adapter) : [],
};
}
const ADAPTERS_BY_CHANNEL_ACCOUNT = new Map<string, SessionBindingAdapter>();
export function registerSessionBindingAdapter(adapter: SessionBindingAdapter): void {
const normalizedAdapter = {
...adapter,
channel: adapter.channel.trim().toLowerCase(),
accountId: normalizeAccountId(adapter.accountId),
};
const key = toAdapterKey({
channel: normalizedAdapter.channel,
accountId: normalizedAdapter.accountId,
});
const existing = ADAPTERS_BY_CHANNEL_ACCOUNT.get(key);
if (existing && existing !== adapter) {
throw new Error(
`Session binding adapter already registered for ${normalizedAdapter.channel}:${normalizedAdapter.accountId}`,
);
}
ADAPTERS_BY_CHANNEL_ACCOUNT.set(key, normalizedAdapter);
}
export function unregisterSessionBindingAdapter(params: {
channel: string;
accountId: string;
}): void {
ADAPTERS_BY_CHANNEL_ACCOUNT.delete(toAdapterKey(params));
}
function resolveAdapterForConversation(ref: ConversationRef): SessionBindingAdapter | null {
return resolveAdapterForChannelAccount({
channel: ref.channel,
accountId: ref.accountId,
});
}
function resolveAdapterForChannelAccount(params: {
channel: string;
accountId: string;
}): SessionBindingAdapter | null {
const key = toAdapterKey({
channel: params.channel,
accountId: params.accountId,
});
return ADAPTERS_BY_CHANNEL_ACCOUNT.get(key) ?? null;
}
function dedupeBindings(records: SessionBindingRecord[]): SessionBindingRecord[] {
const byId = new Map<string, SessionBindingRecord>();
for (const record of records) {
if (!record?.bindingId) {
continue;
}
byId.set(record.bindingId, record);
}
return [...byId.values()];
}
function createDefaultSessionBindingService(): SessionBindingService {
return {
bind: async (input) => {
const normalizedConversation = normalizeConversationRef(input.conversation);
const adapter = resolveAdapterForConversation(normalizedConversation);
if (!adapter) {
throw new SessionBindingError(
"BINDING_ADAPTER_UNAVAILABLE",
`Session binding adapter unavailable for ${normalizedConversation.channel}:${normalizedConversation.accountId}`,
{
channel: normalizedConversation.channel,
accountId: normalizedConversation.accountId,
},
);
}
if (!adapter.bind) {
throw new SessionBindingError(
"BINDING_CAPABILITY_UNSUPPORTED",
`Session binding adapter does not support binding for ${normalizedConversation.channel}:${normalizedConversation.accountId}`,
{
channel: normalizedConversation.channel,
accountId: normalizedConversation.accountId,
},
);
}
const placement =
normalizePlacement(input.placement) ?? inferDefaultPlacement(normalizedConversation);
const supportedPlacements = resolveAdapterPlacements(adapter);
if (!supportedPlacements.includes(placement)) {
throw new SessionBindingError(
"BINDING_CAPABILITY_UNSUPPORTED",
`Session binding placement "${placement}" is not supported for ${normalizedConversation.channel}:${normalizedConversation.accountId}`,
{
channel: normalizedConversation.channel,
accountId: normalizedConversation.accountId,
placement,
},
);
}
const bound = await adapter.bind({
...input,
conversation: normalizedConversation,
placement,
});
if (!bound) {
throw new SessionBindingError(
"BINDING_CREATE_FAILED",
"Session binding adapter failed to bind target conversation",
{
channel: normalizedConversation.channel,
accountId: normalizedConversation.accountId,
placement,
},
);
}
return bound;
},
getCapabilities: (params) => {
const adapter = resolveAdapterForChannelAccount({
channel: params.channel,
accountId: params.accountId,
});
return resolveAdapterCapabilities(adapter);
},
listBySession: (targetSessionKey) => {
const key = targetSessionKey.trim();
if (!key) {
return [];
}
const results: SessionBindingRecord[] = [];
for (const adapter of ADAPTERS_BY_CHANNEL_ACCOUNT.values()) {
const entries = adapter.listBySession(key);
if (entries.length > 0) {
results.push(...entries);
}
}
return dedupeBindings(results);
},
resolveByConversation: (ref) => {
const normalized = normalizeConversationRef(ref);
if (!normalized.channel || !normalized.conversationId) {
return null;
}
const adapter = resolveAdapterForConversation(normalized);
if (!adapter) {
return null;
}
return adapter.resolveByConversation(normalized);
},
touch: (bindingId, at) => {
const normalizedBindingId = bindingId.trim();
if (!normalizedBindingId) {
return;
}
for (const adapter of ADAPTERS_BY_CHANNEL_ACCOUNT.values()) {
adapter.touch?.(normalizedBindingId, at);
}
},
unbind: async (input) => {
const removed: SessionBindingRecord[] = [];
for (const adapter of ADAPTERS_BY_CHANNEL_ACCOUNT.values()) {
if (!adapter.unbind) {
continue;
}
const entries = await adapter.unbind(input);
if (entries.length > 0) {
removed.push(...entries);
}
}
return dedupeBindings(removed);
},
};
}
const DEFAULT_SESSION_BINDING_SERVICE = createDefaultSessionBindingService();
export function getSessionBindingService(): SessionBindingService {
return DEFAULT_SESSION_BINDING_SERVICE;
}
export const __testing = {
resetSessionBindingAdaptersForTests() {
ADAPTERS_BY_CHANNEL_ACCOUNT.clear();
},
getRegisteredAdapterKeys() {
return [...ADAPTERS_BY_CHANNEL_ACCOUNT.keys()];
},
};