shieldx/tests/unit/validation/CanaryManager.test.ts
Rene Fichtmueller 1c4c034483 feat: ShieldX v0.3.0 — UnicodeScanner (L5), DNS Covert Channel rules, ATLAS v5.4 mappings
- Layer 4 EntropyScanner: Shannon entropy, Base32/Base64 detection, CVE-2025-55284
  ping/nslookup exfil, EchoLeak markdown pattern, DNS tunneling (iodine/dnscat)
- Layer 5 UnicodeScanner: ASCII Smuggling (U+E0000 Tags Block), Variant Selectors,
  Zero-Width steganography, CamoLeak image-ordering (CVE-2025-53773), homoglyphs,
  BiDi override, high-entropy URL params
- 30 DNS covert channel rules (dns-001 to dns-030)
- ATLASMapper: 29 techniques (ATLAS v5.4.0 Feb 2026), added AML.T0062 (Agent Tool
  Invocation), AML.TA0015 (C2 tactic), memory poisoning, multi-agent trust,
  CamoLeak, Unicode steganography mappings
- Rule count: 72 → 102
- Build: tsup 316ms, zero TypeScript errors
2026-03-31 16:32:16 +02:00

186 lines
6.4 KiB
TypeScript

import { describe, it, expect, beforeEach } from 'vitest'
import { CanaryManager } from '../../../src/validation/CanaryManager.js'
describe('CanaryManager', () => {
let manager: CanaryManager
beforeEach(() => {
manager = new CanaryManager(3, 3_600_000, 16, 'SX_CANARY_')
})
describe('token generation', () => {
it('should generate the initial number of tokens', () => {
const tokens = manager.getActiveTokens()
expect(tokens).toHaveLength(3)
})
it('should generate unique tokens', () => {
const tokens = manager.getActiveTokens()
const uniqueTokens = new Set(tokens)
expect(uniqueTokens.size).toBe(tokens.length)
})
it('should generate tokens with the configured prefix', () => {
const tokens = manager.getActiveTokens()
for (const token of tokens) {
expect(token.startsWith('SX_CANARY_')).toBe(true)
}
})
it('should generate tokens with random hex content', () => {
const tokens = manager.getActiveTokens()
for (const token of tokens) {
const hex = token.replace('SX_CANARY_', '')
expect(hex).toMatch(/^[a-f0-9]+$/)
expect(hex.length).toBe(32) // 16 bytes * 2 hex chars
}
})
it('should generate new token via generateToken()', () => {
const initialCount = manager.getActiveTokens().length
const newToken = manager.generateToken()
expect(newToken.startsWith('SX_CANARY_')).toBe(true)
expect(manager.getActiveTokens().length).toBe(initialCount + 1)
})
it('should generate multiple tokens via generateTokens()', () => {
const initialCount = manager.getActiveTokens().length
const newTokens = manager.generateTokens(5)
expect(newTokens).toHaveLength(5)
expect(manager.getActiveTokens().length).toBe(initialCount + 5)
})
it('should generate different tokens on each call', () => {
const token1 = manager.generateToken()
const token2 = manager.generateToken()
expect(token1).not.toBe(token2)
})
})
describe('leak detection', () => {
it('should detect leaked canary token in output', () => {
const tokens = manager.getActiveTokens()
const leakedToken = tokens[0]!
const output = `Here is the response. ${leakedToken} And some more text.`
const result = manager.checkOutput(output)
expect(result.leaked).toBe(true)
expect(result.leakedTokens).toContain(leakedToken)
})
it('should detect multiple leaked tokens', () => {
const tokens = manager.getActiveTokens()
const output = `Result: ${tokens[0]} and also ${tokens[1]}`
const result = manager.checkOutput(output)
expect(result.leaked).toBe(true)
expect(result.leakedTokens.length).toBe(2)
})
it('should not report leak when no tokens are in output', () => {
const output = 'This is a clean output with no canary tokens.'
const result = manager.checkOutput(output)
expect(result.leaked).toBe(false)
expect(result.leakedTokens).toHaveLength(0)
})
it('should check against custom token list when provided', () => {
const customTokens = ['CUSTOM_TOKEN_1', 'CUSTOM_TOKEN_2']
const output = 'Output contains CUSTOM_TOKEN_1 somewhere.'
const result = manager.checkOutput(output, customTokens)
expect(result.leaked).toBe(true)
expect(result.leakedTokens).toContain('CUSTOM_TOKEN_1')
})
it('should return frozen result', () => {
const result = manager.checkOutput('no tokens here')
expect(Object.isFrozen(result)).toBe(true)
expect(Object.isFrozen(result.leakedTokens)).toBe(true)
})
})
describe('token rotation', () => {
it('should replace all tokens on rotation', () => {
const oldTokens = [...manager.getActiveTokens()]
const newTokens = manager.rotateTokens()
expect(newTokens.length).toBe(oldTokens.length)
// New tokens should be different from old tokens
for (const oldToken of oldTokens) {
expect(newTokens).not.toContain(oldToken)
}
})
it('should maintain the same token count after rotation', () => {
const countBefore = manager.getActiveTokens().length
manager.rotateTokens()
const countAfter = manager.getActiveTokens().length
expect(countAfter).toBe(countBefore)
})
it('should detect leak after rotation uses new tokens', () => {
manager.rotateTokens()
const newTokens = manager.getActiveTokens()
const output = `Leaked: ${newTokens[0]}`
const result = manager.checkOutput(output)
expect(result.leaked).toBe(true)
})
it('should not detect old tokens after rotation', () => {
const oldTokens = [...manager.getActiveTokens()]
manager.rotateTokens()
const output = `Old token: ${oldTokens[0]}`
const result = manager.checkOutput(output)
expect(result.leaked).toBe(false)
})
})
describe('isRotationDue()', () => {
it('should return false immediately after construction', () => {
expect(manager.isRotationDue()).toBe(false)
})
it('should return false immediately after rotation', () => {
manager.rotateTokens()
expect(manager.isRotationDue()).toBe(false)
})
it('should return true when rotation interval has passed', () => {
// Create manager with very short rotation interval
const shortManager = new CanaryManager(1, 1, 8, 'TEST_')
// Wait briefly to exceed 1ms interval
const start = Date.now()
while (Date.now() - start < 5) {
// busy wait
}
expect(shortManager.isRotationDue()).toBe(true)
})
})
describe('getActiveTokens()', () => {
it('should return a frozen copy of tokens', () => {
const tokens = manager.getActiveTokens()
expect(Object.isFrozen(tokens)).toBe(true)
})
it('should return all active tokens', () => {
const tokens = manager.getActiveTokens()
expect(tokens.length).toBeGreaterThan(0)
})
})
describe('custom configuration', () => {
it('should support custom prefix', () => {
const custom = new CanaryManager(2, 1000, 8, 'MY_PREFIX_')
const tokens = custom.getActiveTokens()
for (const token of tokens) {
expect(token.startsWith('MY_PREFIX_')).toBe(true)
}
})
it('should support custom token length', () => {
const custom = new CanaryManager(1, 1000, 8, 'P_')
const tokens = custom.getActiveTokens()
const hex = tokens[0]!.replace('P_', '')
expect(hex.length).toBe(16) // 8 bytes * 2 hex chars
})
})
})