From 970167245e67f863044e502954d771179b45725a Mon Sep 17 00:00:00 2001 From: "yanlong.wang" Date: Thu, 13 Feb 2025 14:58:57 +0800 Subject: [PATCH] jina-ai: saas features --- jina-ai/src/patch-express.ts | 79 ++++++++++++++++++++++++++++++++---- jina-ai/src/server.ts | 4 +- src/agent.ts | 6 +++ 3 files changed, 79 insertions(+), 10 deletions(-) diff --git a/jina-ai/src/patch-express.ts b/jina-ai/src/patch-express.ts index e48381a..144e8ee 100644 --- a/jina-ai/src/patch-express.ts +++ b/jina-ai/src/patch-express.ts @@ -1,4 +1,4 @@ -import { ApplicationError, RPC_CALL_ENVIRONMENT } from "civkit/civ-rpc"; +import { ApplicationError, Prop, RPC_CALL_ENVIRONMENT } from "civkit/civ-rpc"; import { marshalErrorLike } from "civkit/lang"; import { randomUUID } from "crypto"; import { once } from "events"; @@ -9,16 +9,66 @@ import rateLimitControl, { API_CALL_STATUS, RateLimitDesc } from "./rate-limit"; import asyncLocalContext from "./lib/async-context"; import globalLogger from "./lib/logger"; import { InsufficientBalanceError } from "./lib/errors"; +import { FirestoreRecord } from "./lib/firestore"; globalLogger.serviceReady(); -const logger = globalLogger.child({ service: 'BillingMiddleware' }); - +const logger = globalLogger.child({ service: 'JinaAISaaSMiddleware' }); const appName = 'DEEPRESEARCH'; -export const jinaAiBillingMiddleware = (req: Request, res: Response, next: NextFunction) => { + +export class KnowledgeItem extends FirestoreRecord { + static override collectionName = 'knowledgeItems'; + + @Prop({ + required: true + }) + traceId!: string; + + @Prop({ + required: true + }) + uid!: string; + + @Prop({ + default: '' + }) + question!: string; + + @Prop({ + default: '' + }) + answer!: string; + + @Prop({ + default: '' + }) + type!: string; + + @Prop({ + arrayOf: Object, + default: [] + }) + references!: any[]; + + @Prop({ + defaultFactory: () => new Date() + }) + createdAt!: Date; + + @Prop({ + defaultFactory: () => new Date() + }) + updatedAt!: Date; +} + +export const jinaAiMiddleware = (req: Request, res: Response, next: NextFunction) => { if (req.path === '/ping') { res.status(200).end('pone'); return; } + if (req.path.startsWith('/v1/models')) { + next(); + return; + } if (req.method !== 'POST' && req.method !== 'GET') { next(); return; @@ -36,6 +86,7 @@ export const jinaAiBillingMiddleware = (req: Request, res: Response, next: NextF }); const user = await authDto.assertUser(); + const uid = await authDto.assertUID(); if (!(user.wallet.total_balance > 0)) { throw new InsufficientBalanceError(`Account balance not enough to run this query, please recharge.`); } @@ -52,9 +103,9 @@ export const jinaAiBillingMiddleware = (req: Request, res: Response, next: NextF }) ]; const criterions = rateLimitPolicy.map((c) => rateLimitControl.rateLimitDescToCriterion(c)); - await Promise.all(criterions.map(([pointInTime, n]) => rateLimitControl.assertUidPeriodicLimit(user._id, pointInTime, n, appName))); + await Promise.all(criterions.map(([pointInTime, n]) => rateLimitControl.assertUidPeriodicLimit(uid, pointInTime, n, appName))); - const apiRoll = rateLimitControl.record({ uid: user._id, tags: [appName] }) + const apiRoll = rateLimitControl.record({ uid, tags: [appName] }) apiRoll.save().catch((err) => logger.warn(`Failed to save rate limit record`, { err: marshalErrorLike(err) })); const pResClose = once(res, 'close'); @@ -65,17 +116,29 @@ export const jinaAiBillingMiddleware = (req: Request, res: Response, next: NextF const chargeAmount = ctx.chargeAmount; if (chargeAmount) { authDto.reportUsage(chargeAmount, `reader-${appName}`).catch((err) => { - logger.warn(`Unable to report usage for ${user._id}`, { err: marshalErrorLike(err) }); + logger.warn(`Unable to report usage for ${uid}`, { err: marshalErrorLike(err) }); }); apiRoll.chargeAmount = chargeAmount; } apiRoll.status = res.statusCode === 200 ? API_CALL_STATUS.SUCCESS : API_CALL_STATUS.ERROR; apiRoll.save().catch((err) => logger.warn(`Failed to save rate limit record`, { err: marshalErrorLike(err) })); logger.info(`HTTP ${res.statusCode} for request ${ctx.traceId} after ${Date.now() - ctx.traceT0.valueOf()}ms`, { - uid: user._id, + uid, chargeAmount, }); + if (ctx.promptContext.knowledge?.length) { + Promise.all(ctx.promptContext.knowledge.map((x: any) => KnowledgeItem.save( + KnowledgeItem.from({ + ...x, + uid, + traceId: ctx.traceId, + }) + ))).catch((err: any) => { + logger.warn(`Failed to save knowledge`, { err: marshalErrorLike(err) }); + }); + } + } catch (err: any) { if (!res.headersSent) { if (err instanceof ApplicationError) { diff --git a/jina-ai/src/server.ts b/jina-ai/src/server.ts index fdf31c2..6f9f4cc 100644 --- a/jina-ai/src/server.ts +++ b/jina-ai/src/server.ts @@ -1,12 +1,12 @@ import 'reflect-metadata' import express from 'express'; -import { jinaAiBillingMiddleware } from "./patch-express"; +import { jinaAiMiddleware } from "./patch-express"; import { Server } from 'http'; const app = require('../..').default; const rootApp = express(); -rootApp.use(jinaAiBillingMiddleware, app); +rootApp.use(jinaAiMiddleware, app); const port = process.env.PORT || 3000; diff --git a/src/agent.ts b/src/agent.ts index 4649953..819a882 100644 --- a/src/agent.ts +++ b/src/agent.ts @@ -719,6 +719,12 @@ You decided to think out of the box or cut from a completely different angle.`); } async function storeContext(prompt: string, schema: any, memory: any[][], step: number) { + if ((process as any).asyncLocalContext?.available?.()) { + const [context, keywords, questions, knowledge] = memory; + (process as any).asyncLocalContext.ctx.promptContext = { prompt, schema, context, keywords, questions, knowledge, step }; + return; + } + try { await fs.writeFile(`prompt-${step}.txt`, ` Prompt: