feat(api): 支持电池分页和安全预测
This commit is contained in:
@@ -4,10 +4,23 @@ import { batteriesResponseSchema, dashboardSnapshotSchema } from '@/domain/batte
|
||||
|
||||
export const dashboard = oc.input(z.void()).output(dashboardSnapshotSchema)
|
||||
|
||||
const batteryListInputSchema = z.object({
|
||||
pageSize: z.number().int().min(1).max(100).default(50),
|
||||
cursor: z.string().min(1).optional(),
|
||||
search: z.string().trim().min(1).max(100).optional(),
|
||||
lowPower: z.boolean().optional(),
|
||||
powerStatus: z.union([z.literal(0), z.literal(1), z.literal(2)]).optional(),
|
||||
sort: z.enum(['createdAtDesc', 'createdAtAsc', 'powerDesc', 'powerAsc']).default('createdAtDesc'),
|
||||
})
|
||||
|
||||
export const batteries = oc
|
||||
.input(batteryListInputSchema)
|
||||
.output(batteriesResponseSchema)
|
||||
|
||||
export const history = oc
|
||||
.input(
|
||||
z.object({
|
||||
mac: z.string().min(1).optional(),
|
||||
mac: z.string().min(1),
|
||||
}),
|
||||
)
|
||||
.output(batteriesResponseSchema)
|
||||
|
||||
@@ -1,19 +1,44 @@
|
||||
import { createBatteriesResponse, createDashboardSnapshot } from '@/domain/battery'
|
||||
import { os } from '@/server/api/server'
|
||||
import { getBatteryHistory, getBatteryPredictionHistory, getLatestBatteryPerDevice } from '@/server/battery/mysql'
|
||||
import {
|
||||
getBatteryHistory,
|
||||
getBatteryPredictionHistories,
|
||||
getLatestBatteryPage,
|
||||
getLatestBatteryPerDevice,
|
||||
} from '@/server/battery/mysql'
|
||||
import { isPredictionEnabled, predictSoh } from '@/server/prediction/client'
|
||||
|
||||
const dashboardPredictionConcurrency = 5
|
||||
|
||||
async function mapWithConcurrency<T, R>(items: T[], concurrency: number, handler: (item: T) => Promise<R>): Promise<R[]> {
|
||||
const results: R[] = []
|
||||
let nextIndex = 0
|
||||
|
||||
async function worker() {
|
||||
while (nextIndex < items.length) {
|
||||
const index = nextIndex
|
||||
nextIndex += 1
|
||||
const item = items[index]
|
||||
if (item !== undefined) results[index] = await handler(item)
|
||||
}
|
||||
}
|
||||
|
||||
await Promise.all(Array.from({ length: Math.min(concurrency, items.length) }, worker))
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
export const dashboard = os.battery.dashboard.handler(async () => {
|
||||
const items = await getLatestBatteryPerDevice()
|
||||
const predictionHistories = isPredictionEnabled()
|
||||
? await getBatteryPredictionHistories(items.map((item) => item.mac))
|
||||
: new Map()
|
||||
const predictionEntries = isPredictionEnabled()
|
||||
? await Promise.all(
|
||||
items.map(async (item) => {
|
||||
const history = await getBatteryPredictionHistory(item.mac)
|
||||
const prediction = await predictSoh(item, history)
|
||||
? await mapWithConcurrency(items, dashboardPredictionConcurrency, async (item) => {
|
||||
const prediction = await predictSoh(item, predictionHistories.get(item.mac) ?? [])
|
||||
|
||||
return prediction ? ([item.mac, prediction] as const) : null
|
||||
}),
|
||||
)
|
||||
})
|
||||
: []
|
||||
const predictions = new Map(predictionEntries.filter((entry) => entry !== null))
|
||||
|
||||
@@ -21,7 +46,18 @@ export const dashboard = os.battery.dashboard.handler(async () => {
|
||||
})
|
||||
|
||||
export const batteries = os.battery.batteries.handler(async ({ input }) => {
|
||||
const items = input.mac ? await getBatteryHistory(input.mac) : await getLatestBatteryPerDevice()
|
||||
const page = await getLatestBatteryPage(input)
|
||||
|
||||
return createBatteriesResponse(
|
||||
page.items,
|
||||
new Date(),
|
||||
{ total: page.total, lowPower: page.lowPower, charging: page.charging },
|
||||
page.nextCursor,
|
||||
)
|
||||
})
|
||||
|
||||
export const history = os.battery.history.handler(async ({ input }) => {
|
||||
const items = await getBatteryHistory(input.mac)
|
||||
|
||||
return createBatteriesResponse(items)
|
||||
})
|
||||
|
||||
+236
-10
@@ -5,8 +5,40 @@ import { env } from '@/env'
|
||||
|
||||
const historyLimit = 500
|
||||
const predictionHistoryLimit = 10
|
||||
const dashboardLatestLimit = 100
|
||||
|
||||
type BatteryInfoMysqlRow = RowDataPacket & BatteryInfoSourceRow
|
||||
type CountMysqlRow = RowDataPacket & {
|
||||
total: number
|
||||
lowPower: number | string | null
|
||||
charging: number | string | null
|
||||
}
|
||||
|
||||
export type BatteryListSort = 'createdAtDesc' | 'createdAtAsc' | 'powerDesc' | 'powerAsc'
|
||||
|
||||
export type LatestBatteryPageInput = {
|
||||
pageSize: number
|
||||
cursor?: string
|
||||
search?: string
|
||||
lowPower?: boolean
|
||||
powerStatus?: 0 | 1 | 2
|
||||
sort?: BatteryListSort
|
||||
}
|
||||
|
||||
export type LatestBatteryPage = {
|
||||
items: BatteryInfo[]
|
||||
nextCursor: string | null
|
||||
total?: number
|
||||
lowPower?: number
|
||||
charging?: number
|
||||
}
|
||||
|
||||
type PageCursor = {
|
||||
sort: BatteryListSort
|
||||
createTime: string
|
||||
id: number
|
||||
power?: number
|
||||
}
|
||||
|
||||
let pool: Pool | undefined
|
||||
|
||||
@@ -41,6 +73,130 @@ const sourceColumns = `
|
||||
remark
|
||||
`
|
||||
|
||||
const normalizedColumns = `
|
||||
id,
|
||||
userId,
|
||||
mac,
|
||||
devModel,
|
||||
devName,
|
||||
isLowPower,
|
||||
powerStatus,
|
||||
power,
|
||||
createTime,
|
||||
remark
|
||||
`
|
||||
|
||||
const latestRecordPredicate = `
|
||||
NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM ls_battery_info AS newer_record
|
||||
WHERE newer_record.mac = current_record.mac
|
||||
AND (
|
||||
newer_record.create_time > current_record.create_time
|
||||
OR (newer_record.create_time = current_record.create_time AND newer_record.id > current_record.id)
|
||||
)
|
||||
)
|
||||
`
|
||||
|
||||
const orderByBySort: Record<BatteryListSort, string> = {
|
||||
createdAtDesc: 'current_record.create_time DESC, current_record.id DESC',
|
||||
createdAtAsc: 'current_record.create_time ASC, current_record.id ASC',
|
||||
powerDesc: 'current_record.power DESC, current_record.create_time DESC, current_record.id DESC',
|
||||
powerAsc: 'current_record.power ASC, current_record.create_time DESC, current_record.id DESC',
|
||||
}
|
||||
|
||||
function toNumber(value: number | string | null | undefined) {
|
||||
if (value === null || value === undefined) return 0
|
||||
return Number(value)
|
||||
}
|
||||
|
||||
function encodeCursor(item: BatteryInfo, sort: BatteryListSort) {
|
||||
const cursor: PageCursor = {
|
||||
sort,
|
||||
createTime: item.createTime,
|
||||
id: item.id,
|
||||
power: sort === 'powerAsc' || sort === 'powerDesc' ? item.power : undefined,
|
||||
}
|
||||
|
||||
return Buffer.from(JSON.stringify(cursor)).toString('base64url')
|
||||
}
|
||||
|
||||
function decodeCursor(value: string | undefined, sort: BatteryListSort): PageCursor | null {
|
||||
if (!value) return null
|
||||
|
||||
try {
|
||||
const decoded = JSON.parse(Buffer.from(value, 'base64url').toString('utf8')) as Partial<PageCursor>
|
||||
if (decoded.sort !== sort || typeof decoded.createTime !== 'string' || typeof decoded.id !== 'number') return null
|
||||
if ((sort === 'powerAsc' || sort === 'powerDesc') && typeof decoded.power !== 'number') return null
|
||||
|
||||
return decoded as PageCursor
|
||||
} catch {
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
function escapeLike(value: string) {
|
||||
return value.replace(/[\\%_]/g, (match) => `\\${match}`)
|
||||
}
|
||||
|
||||
function normalizeCursorDateTime(value: string) {
|
||||
return value.includes('T') ? value.slice(0, 19).replace('T', ' ') : value
|
||||
}
|
||||
|
||||
function createLatestWhere(input: LatestBatteryPageInput, cursor: PageCursor | null) {
|
||||
const clauses = [latestRecordPredicate]
|
||||
const params: Record<string, string | number> = {}
|
||||
|
||||
if (input.search) {
|
||||
clauses.push(
|
||||
'(current_record.mac LIKE :search ESCAPE \'\\\\\' OR current_record.dev_name LIKE :search ESCAPE \'\\\\\' OR current_record.dev_model LIKE :search ESCAPE \'\\\\\')',
|
||||
)
|
||||
params.search = `%${escapeLike(input.search)}%`
|
||||
}
|
||||
|
||||
if (input.lowPower !== undefined) {
|
||||
clauses.push('current_record.is_low_power = :lowPower')
|
||||
params.lowPower = input.lowPower ? 'true' : 'false'
|
||||
}
|
||||
|
||||
if (input.powerStatus !== undefined) {
|
||||
clauses.push('current_record.power_status = :powerStatus')
|
||||
params.powerStatus = input.powerStatus
|
||||
}
|
||||
|
||||
if (cursor) {
|
||||
params.cursorCreateTime = normalizeCursorDateTime(cursor.createTime)
|
||||
params.cursorId = cursor.id
|
||||
|
||||
switch (input.sort ?? 'createdAtDesc') {
|
||||
case 'createdAtAsc':
|
||||
clauses.push(
|
||||
'(current_record.create_time > :cursorCreateTime OR (current_record.create_time = :cursorCreateTime AND current_record.id > :cursorId))',
|
||||
)
|
||||
break
|
||||
case 'powerDesc':
|
||||
params.cursorPower = cursor.power ?? 0
|
||||
clauses.push(
|
||||
'(current_record.power < :cursorPower OR (current_record.power = :cursorPower AND (current_record.create_time < :cursorCreateTime OR (current_record.create_time = :cursorCreateTime AND current_record.id < :cursorId))))',
|
||||
)
|
||||
break
|
||||
case 'powerAsc':
|
||||
params.cursorPower = cursor.power ?? 0
|
||||
clauses.push(
|
||||
'(current_record.power > :cursorPower OR (current_record.power = :cursorPower AND (current_record.create_time < :cursorCreateTime OR (current_record.create_time = :cursorCreateTime AND current_record.id < :cursorId))))',
|
||||
)
|
||||
break
|
||||
case 'createdAtDesc':
|
||||
clauses.push(
|
||||
'(current_record.create_time < :cursorCreateTime OR (current_record.create_time = :cursorCreateTime AND current_record.id < :cursorId))',
|
||||
)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return { whereSql: clauses.map((clause) => `(${clause})`).join(' AND '), params }
|
||||
}
|
||||
|
||||
export async function getBatteryHistory(mac: string): Promise<BatteryInfo[]> {
|
||||
const [rows] = await getBatteryPool().query<BatteryInfoMysqlRow[]>(
|
||||
`
|
||||
@@ -71,21 +227,91 @@ export async function getBatteryPredictionHistory(mac: string): Promise<BatteryI
|
||||
return rows.map(toBatteryInfo).reverse()
|
||||
}
|
||||
|
||||
export async function getLatestBatteryPerDevice(): Promise<BatteryInfo[]> {
|
||||
const [rows] = await getBatteryPool().query<BatteryInfoMysqlRow[]>(`
|
||||
export async function getBatteryPredictionHistories(macAddresses: string[]): Promise<Map<string, BatteryInfo[]>> {
|
||||
if (macAddresses.length === 0) return new Map()
|
||||
|
||||
const params = Object.fromEntries(macAddresses.map((mac, index) => [`mac${index}`, mac]))
|
||||
const placeholders = macAddresses.map((_, index) => `:mac${index}`).join(', ')
|
||||
const [rows] = await getBatteryPool().query<BatteryInfoMysqlRow[]>(
|
||||
`
|
||||
SELECT ${normalizedColumns}
|
||||
FROM (
|
||||
SELECT
|
||||
${sourceColumns},
|
||||
ROW_NUMBER() OVER (PARTITION BY mac ORDER BY create_time DESC, id DESC) AS history_rank
|
||||
FROM ls_battery_info
|
||||
WHERE mac IN (${placeholders})
|
||||
) AS ranked_history
|
||||
WHERE ranked_history.history_rank <= :limit
|
||||
ORDER BY ranked_history.mac ASC, ranked_history.createTime ASC, ranked_history.id ASC
|
||||
`,
|
||||
{ ...params, limit: predictionHistoryLimit },
|
||||
)
|
||||
|
||||
const histories = new Map<string, BatteryInfo[]>()
|
||||
for (const item of rows.map(toBatteryInfo)) {
|
||||
histories.set(item.mac, [...(histories.get(item.mac) ?? []), item])
|
||||
}
|
||||
|
||||
return histories
|
||||
}
|
||||
|
||||
export async function getLatestBatteryPage(input: LatestBatteryPageInput): Promise<LatestBatteryPage> {
|
||||
const sort = input.sort ?? 'createdAtDesc'
|
||||
const pageSize = Math.min(Math.max(input.pageSize, 1), 100)
|
||||
const cursor = decodeCursor(input.cursor, sort)
|
||||
const { whereSql, params } = createLatestWhere({ ...input, sort, pageSize }, cursor)
|
||||
const countWhere = createLatestWhere({ ...input, sort, pageSize }, null)
|
||||
const queryLimit = pageSize + 1
|
||||
|
||||
const [rows] = await getBatteryPool().query<BatteryInfoMysqlRow[]>(
|
||||
`
|
||||
SELECT ${sourceColumns}
|
||||
FROM ls_battery_info AS current_record
|
||||
WHERE NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM ls_battery_info AS newer_record
|
||||
WHERE newer_record.mac = current_record.mac
|
||||
AND (
|
||||
newer_record.create_time > current_record.create_time
|
||||
OR (newer_record.create_time = current_record.create_time AND newer_record.id > current_record.id)
|
||||
WHERE ${whereSql}
|
||||
ORDER BY ${orderByBySort[sort]}
|
||||
LIMIT :limit
|
||||
`,
|
||||
{ ...params, limit: queryLimit },
|
||||
)
|
||||
|
||||
const pageItems = rows.slice(0, pageSize).map(toBatteryInfo)
|
||||
const lastPageItem = pageItems.at(-1)
|
||||
const nextCursor = rows.length > pageSize && lastPageItem ? encodeCursor(lastPageItem, sort) : null
|
||||
|
||||
const [countRows] = await getBatteryPool().query<CountMysqlRow[]>(
|
||||
`
|
||||
SELECT
|
||||
COUNT(*) AS total,
|
||||
COALESCE(SUM(CASE WHEN current_record.is_low_power = 'true' THEN 1 ELSE 0 END), 0) AS lowPower,
|
||||
COALESCE(SUM(CASE WHEN current_record.power_status = 1 THEN 1 ELSE 0 END), 0) AS charging
|
||||
FROM ls_battery_info AS current_record
|
||||
WHERE ${countWhere.whereSql}
|
||||
`,
|
||||
countWhere.params,
|
||||
)
|
||||
const counts = countRows[0]
|
||||
|
||||
return {
|
||||
items: pageItems,
|
||||
nextCursor,
|
||||
total: toNumber(counts?.total),
|
||||
lowPower: toNumber(counts?.lowPower),
|
||||
charging: toNumber(counts?.charging),
|
||||
}
|
||||
}
|
||||
|
||||
export async function getLatestBatteryPerDevice(limit = dashboardLatestLimit): Promise<BatteryInfo[]> {
|
||||
const [rows] = await getBatteryPool().query<BatteryInfoMysqlRow[]>(
|
||||
`
|
||||
SELECT ${sourceColumns}
|
||||
FROM ls_battery_info AS current_record
|
||||
WHERE ${latestRecordPredicate}
|
||||
ORDER BY current_record.create_time DESC, current_record.id DESC
|
||||
`)
|
||||
LIMIT :limit
|
||||
`,
|
||||
{ limit: Math.min(Math.max(limit, 1), dashboardLatestLimit) },
|
||||
)
|
||||
|
||||
return rows.map(toBatteryInfo)
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { LRUCache } from 'lru-cache'
|
||||
import { z } from 'zod'
|
||||
import type { BatteryInfo, BatteryPrediction } from '@/domain/battery'
|
||||
import { env } from '@/env'
|
||||
@@ -60,13 +61,12 @@ const predictionResponseSchema = z.object({
|
||||
updated_at: z.string().nullable().optional(),
|
||||
})
|
||||
|
||||
type CacheEntry = {
|
||||
expiresAt: number
|
||||
value: SohPrediction
|
||||
}
|
||||
|
||||
const logger = getLogger(['prediction'])
|
||||
const cache = new Map<string, CacheEntry>()
|
||||
const cache = new LRUCache<string, SohPrediction>({
|
||||
max: 5_000,
|
||||
ttl: env.SOH_PREDICTION_CACHE_TTL_SECONDS * 1000,
|
||||
})
|
||||
const inFlightRequests = new Map<string, Promise<SohPrediction | null>>()
|
||||
|
||||
const round2 = (value: number) => Math.round(value * 100) / 100
|
||||
|
||||
@@ -155,7 +155,22 @@ export async function predictSoh(battery: BatteryInfo, history: BatteryInfo[]):
|
||||
|
||||
const cacheKey = createCacheKey(battery, history)
|
||||
const cached = cache.get(cacheKey)
|
||||
if (cached && cached.expiresAt > Date.now()) return cached.value
|
||||
if (cached) return cached
|
||||
const pendingRequest = inFlightRequests.get(cacheKey)
|
||||
if (pendingRequest) return pendingRequest
|
||||
|
||||
const requestPromise = requestPrediction(cacheKey, battery, request)
|
||||
inFlightRequests.set(cacheKey, requestPromise)
|
||||
|
||||
return requestPromise
|
||||
}
|
||||
|
||||
async function requestPrediction(
|
||||
cacheKey: string,
|
||||
battery: BatteryInfo,
|
||||
request: PredictionRequest,
|
||||
): Promise<SohPrediction | null> {
|
||||
if (!env.SOH_PREDICTION_API_BASE_URL) return null
|
||||
|
||||
const controller = new AbortController()
|
||||
const timeout = setTimeout(() => controller.abort(), env.SOH_PREDICTION_TIMEOUT_MS)
|
||||
@@ -179,10 +194,7 @@ export async function predictSoh(battery: BatteryInfo, history: BatteryInfo[]):
|
||||
|
||||
const json = await response.json()
|
||||
const prediction = normalizePrediction(predictionResponseSchema.parse(json))
|
||||
cache.set(cacheKey, {
|
||||
expiresAt: Date.now() + env.SOH_PREDICTION_CACHE_TTL_SECONDS * 1000,
|
||||
value: prediction,
|
||||
})
|
||||
cache.set(cacheKey, prediction)
|
||||
|
||||
return prediction
|
||||
} catch (error) {
|
||||
@@ -190,9 +202,11 @@ export async function predictSoh(battery: BatteryInfo, history: BatteryInfo[]):
|
||||
return null
|
||||
} finally {
|
||||
clearTimeout(timeout)
|
||||
inFlightRequests.delete(cacheKey)
|
||||
}
|
||||
}
|
||||
|
||||
export function clearPredictionCache() {
|
||||
cache.clear()
|
||||
inFlightRequests.clear()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user