Skip to content

feat(splitter,pipeline): robust, context-aware JSON splitting and pipeline integration (#7) #120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/scraper/fetcher/FileFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ export class FileFetcher implements ContentFetcher {
return "text/markdown";
case ".txt":
return "text/plain";
case ".json":
return "application/json";
default:
return "application/octet-stream";
}
Expand Down
114 changes: 114 additions & 0 deletions src/scraper/pipelines/JsonPipeline.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { describe, expect, it } from "vitest";
import type { RawContent } from "../fetcher/types";
import { JsonPipeline } from "./JsonPipeline";

// Helper: pretty-print JSON for easier assertions
function pretty(json: unknown) {
return JSON.stringify(json, null, 2);
}

// Minimal valid ScraperOptions for tests
const dummyOptions = {
url: "test.json",
library: "test",
version: "1.0",
};
// Dummy ContentFetcher implementation
const dummyFetcher = {
canFetch: () => false,
fetch: async () => Promise.reject(new Error("Not implemented")),
};

describe("JsonPipeline", () => {
it("canProcess returns true for JSON MIME types", () => {
const pipeline = new JsonPipeline();
const validTypes = [
"application/json",
"application/ld+json",
"application/vnd.api+json",
"text/json",
"application/json5",
];
for (const mimeType of validTypes) {
expect(pipeline.canProcess({ mimeType } as RawContent)).toBe(true);
}
});

it("canProcess returns false for non-JSON MIME types", () => {
const pipeline = new JsonPipeline();
const invalidTypes = [
"text/html",
"text/plain",
"application/xml",
"text/markdown",
"image/png",
];
for (const mimeType of invalidTypes) {
expect(pipeline.canProcess({ mimeType } as RawContent)).toBe(false);
}
});

it("splits large JSON arrays into valid JSON chunks", async () => {
const pipeline = new JsonPipeline();
const arr = Array.from({ length: 100 }, (_, i) => ({ id: i, value: `item${i}` }));
const raw: RawContent = {
content: pretty(arr),
mimeType: "application/json",
source: "test.json",
};
const result = await pipeline.process(raw, dummyOptions, dummyFetcher);
// Should produce multiple chunks, each valid JSON
const chunks = result.textContent.split("\n");
for (const chunk of chunks) {
expect(() => JSON.parse(chunk)).not.toThrow();
}
// Should cover all items
const allItems = chunks.flatMap((chunk) => JSON.parse(chunk));
expect(allItems.length).toBe(100);
});

it("splits large JSON objects into valid JSON chunks", async () => {
const pipeline = new JsonPipeline();
const obj: Record<string, unknown> = {};
for (let i = 0; i < 100; i++) obj[`key${i}`] = { id: i, value: `item${i}` };
const raw: RawContent = {
content: pretty(obj),
mimeType: "application/json",
source: "test.json",
};
const result = await pipeline.process(raw, dummyOptions, dummyFetcher);
const chunks = result.textContent.split("\n");
for (const chunk of chunks) {
expect(() => JSON.parse(chunk)).not.toThrow();
}
// Should cover all keys
const allKeys = chunks.flatMap((chunk) => Object.keys(JSON.parse(chunk)));
expect(new Set(allKeys).size).toBe(100);
});

it("handles small JSON files as a single chunk", async () => {
const pipeline = new JsonPipeline();
const data = { foo: 1, bar: [1, 2, 3] };
const raw: RawContent = {
content: pretty(data),
mimeType: "application/json",
source: "test.json",
};
const result = await pipeline.process(raw, dummyOptions, dummyFetcher);
// Should be a single chunk
expect(result.textContent.split("\n").length).toBe(1);
expect(() => JSON.parse(result.textContent)).not.toThrow();
});

it("returns metadata with the source as title", async () => {
const pipeline = new JsonPipeline();
const data = { foo: "bar" };
const raw: RawContent = {
content: pretty(data),
mimeType: "application/json",
source: "test.json",
};
const result = await pipeline.process(raw, dummyOptions, dummyFetcher);
expect(result.metadata.title).toBe("test.json");
});
});
39 changes: 39 additions & 0 deletions src/scraper/pipelines/JsonPipeline.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { JsonContentSplitter } from "../../splitter/splitters/JsonContentSplitter";
import type { RawContent } from "../fetcher/types";
import type { ContentFetcher } from "../fetcher/types";
import type { ScraperOptions } from "../types";
import { BasePipeline } from "./BasePipeline";
import type { ProcessedContent } from "./types";

/**
* Pipeline for processing JSON content using the JsonContentSplitter.
*/
export class JsonPipeline extends BasePipeline {
canProcess(raw: RawContent): boolean {
return (
typeof raw.mimeType === "string" &&
(raw.mimeType === "application/json" ||
raw.mimeType === "application/ld+json" ||
raw.mimeType.endsWith("+json") ||
raw.mimeType === "text/json" ||
raw.mimeType === "application/json5")
);
}

async process(
raw: RawContent,
_options: ScraperOptions,
_fetcher: ContentFetcher,
): Promise<ProcessedContent> {
const content =
typeof raw.content === "string" ? raw.content : raw.content.toString("utf-8");
const splitter = new JsonContentSplitter({ chunkSize: 5000 }); // Use a reasonable default chunk size
const chunks = await splitter.split(content);
return {
textContent: chunks.join("\n"),
metadata: { title: raw.source },
links: [], // JSON doesn't typically have links
errors: [],
};
}
}
31 changes: 31 additions & 0 deletions src/scraper/strategies/LocalFileStrategy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,35 @@ describe("LocalFileStrategy", () => {
}),
);
});

it("should process .json files using the JsonPipeline", async () => {
const strategy = new LocalFileStrategy();
const options: ScraperOptions = {
url: "file:///testdir",
library: "test",
version: "1.0",
maxPages: 10,
maxDepth: 1,
maxConcurrency: 1,
};
const progressCallback = vi.fn();
const jsonContent = JSON.stringify({ a: 1, b: [2, 3, 4], c: { d: 5 } });
vol.fromJSON(
{
"/testdir/data.json": jsonContent,
},
"/",
);

await strategy.scrape(options, progressCallback);
expect(progressCallback).toHaveBeenCalledTimes(1);
const call = progressCallback.mock.calls[0][0];
expect(call.currentUrl).toBe("file:///testdir/data.json");
// Parse the output and check structure, not formatting
const parsed = JSON.parse(call.document.content);
expect(parsed.a).toBe(1);
expect(parsed.b).toEqual([2, 3, 4]);
expect(parsed.c).toEqual({ d: 5 });
expect(call.document.metadata.url).toBe("file:///testdir/data.json");
});
});
9 changes: 7 additions & 2 deletions src/scraper/strategies/LocalFileStrategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import fs from "node:fs/promises";
import path from "node:path";
import type { Document, ProgressCallback } from "../../types";
import { logger } from "../../utils/logger";
import { MimeTypeUtils } from "../../utils/mimeTypeUtils";
import { FileFetcher } from "../fetcher";
import type { RawContent } from "../fetcher/types";
import { HtmlPipeline } from "../pipelines/HtmlPipeline";
import { JsonPipeline } from "../pipelines/JsonPipeline";
import { MarkdownPipeline } from "../pipelines/MarkdownPipeline";
import type { ScraperOptions, ScraperProgress } from "../types";
import { BaseScraperStrategy, type QueueItem } from "./BaseScraperStrategy";
Expand All @@ -13,13 +15,15 @@ export class LocalFileStrategy extends BaseScraperStrategy {
private readonly fileFetcher = new FileFetcher();
private readonly htmlPipeline: HtmlPipeline;
private readonly markdownPipeline: MarkdownPipeline;
private readonly pipelines: [HtmlPipeline, MarkdownPipeline];
private readonly jsonPipeline: JsonPipeline;
private readonly pipelines: [HtmlPipeline, MarkdownPipeline, JsonPipeline];

constructor() {
super();
this.htmlPipeline = new HtmlPipeline();
this.markdownPipeline = new MarkdownPipeline();
this.pipelines = [this.htmlPipeline, this.markdownPipeline];
this.jsonPipeline = new JsonPipeline();
this.pipelines = [this.htmlPipeline, this.markdownPipeline, this.jsonPipeline];
}

canHandle(url: string): boolean {
Expand Down Expand Up @@ -92,6 +96,7 @@ export class LocalFileStrategy extends BaseScraperStrategy {
} finally {
await this.htmlPipeline.close();
await this.markdownPipeline.close();
await this.jsonPipeline.close();
}
}
}
21 changes: 21 additions & 0 deletions src/scraper/strategies/WebScraperStrategy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -492,4 +492,25 @@ describe("WebScraperStrategy", () => {
expect(docCall![0].document.content).toContain(expectedMarkdown);
expect(docCall![0].document.metadata.title).toBe(expectedTitle);
});

it("should process .json files using the JsonPipeline", async () => {
const progressCallback = vi.fn();
const testUrl = "https://example.com/data.json";
options.url = testUrl;
const jsonContent = JSON.stringify({ foo: [1, 2, 3], bar: { baz: true } });
mockFetchFn.mockResolvedValue({
content: jsonContent,
mimeType: "application/json",
source: testUrl,
});
await strategy.scrape(options, progressCallback);
expect(progressCallback).toHaveBeenCalled();
const call = progressCallback.mock.calls[0][0];
expect(call.currentUrl).toBe(testUrl);
// Parse the output and check structure, not formatting
const parsed = JSON.parse(call.document.content);
expect(parsed.foo).toEqual([1, 2, 3]);
expect(parsed.bar).toEqual({ baz: true });
expect(call.document.metadata.url).toBe(testUrl);
});
});
5 changes: 4 additions & 1 deletion src/scraper/strategies/WebScraperStrategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { hasSameDomain, hasSameHostname, isSubpath } from "../../utils/url";
import { HttpFetcher } from "../fetcher";
import type { RawContent } from "../fetcher/types";
import { HtmlPipeline } from "../pipelines/HtmlPipeline";
import { JsonPipeline } from "../pipelines/JsonPipeline";
import { MarkdownPipeline } from "../pipelines/MarkdownPipeline";
import type { ContentPipeline, ProcessedContent } from "../pipelines/types";
import type { ScraperOptions, ScraperProgress } from "../types";
Expand All @@ -20,14 +21,16 @@ export class WebScraperStrategy extends BaseScraperStrategy {
private readonly shouldFollowLinkFn?: (baseUrl: URL, targetUrl: URL) => boolean;
private readonly htmlPipeline: HtmlPipeline;
private readonly markdownPipeline: MarkdownPipeline;
private readonly jsonPipeline: JsonPipeline;
private readonly pipelines: ContentPipeline[];

constructor(options: WebScraperStrategyOptions = {}) {
super({ urlNormalizerOptions: options.urlNormalizerOptions });
this.shouldFollowLinkFn = options.shouldFollowLink;
this.htmlPipeline = new HtmlPipeline();
this.markdownPipeline = new MarkdownPipeline();
this.pipelines = [this.htmlPipeline, this.markdownPipeline];
this.jsonPipeline = new JsonPipeline();
this.pipelines = [this.htmlPipeline, this.markdownPipeline, this.jsonPipeline];
}

canHandle(url: string): boolean {
Expand Down
32 changes: 32 additions & 0 deletions src/splitter/SemanticMarkdownSplitter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -371,4 +371,36 @@ ${codeLines}
// Each chunk should be under the max size
expect(result.every((chunk) => chunk.content.length <= 20)).toBe(true);
});

it("should split large JSON code blocks into valid JSON chunks", async () => {
const splitter = new SemanticMarkdownSplitter(10, 50); // small chunk size for test
const jsonArray = Array.from({ length: 10 }, (_, i) => ({ id: i, value: `val${i}` }));
const markdown = ["```json", JSON.stringify(jsonArray), "```"].join("\n");
const result = await splitter.splitText(markdown);
// All chunks should be code, valid JSON, and within size
expect(result.length).toBeGreaterThan(1);
for (const chunk of result) {
expect(chunk.types).toEqual(["code"]);
expect(chunk.content).toMatch(/^```json\n[\s\S]*\n```$/);
// Extract JSON body
const body = chunk.content.replace(/^```json\n/, "").replace(/\n```$/, "");
expect(() => JSON.parse(body)).not.toThrow();
expect(chunk.content.length).toBeLessThanOrEqual(50);
}
});

it("should fall back to normal code splitting for non-JSON code blocks", async () => {
const splitter = new SemanticMarkdownSplitter(10, 50);
const codeLines = Array.from({ length: 10 }, (_, i) => `console.log(${i});`).join(
"\n",
);
const markdown = ["```js", codeLines, "```"].join("\n");
const result = await splitter.splitText(markdown);
expect(result.length).toBeGreaterThan(1);
for (const chunk of result) {
expect(chunk.types).toEqual(["code"]);
expect(chunk.content).toMatch(/^```js\n[\s\S]*\n```$/);
expect(chunk.content.length).toBeLessThanOrEqual(50);
}
});
});
27 changes: 26 additions & 1 deletion src/splitter/SemanticMarkdownSplitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { logger } from "../utils/logger";
import { fullTrim } from "../utils/string";
import { ContentSplitterError, MinimumChunkSizeError } from "./errors";
import { CodeContentSplitter } from "./splitters/CodeContentSplitter";
import { JsonContentSplitter } from "./splitters/JsonContentSplitter";
import { TableContentSplitter } from "./splitters/TableContentSplitter";
import { TextContentSplitter } from "./splitters/TextContentSplitter";
import type { ContentChunk, DocumentSplitter, SectionContentType } from "./types";
Expand Down Expand Up @@ -40,6 +41,7 @@ export class SemanticMarkdownSplitter implements DocumentSplitter {
public textSplitter: TextContentSplitter;
public codeSplitter: CodeContentSplitter;
public tableSplitter: TableContentSplitter;
public jsonSplitter: JsonContentSplitter;

constructor(
private preferredChunkSize: number,
Expand Down Expand Up @@ -97,6 +99,9 @@ export class SemanticMarkdownSplitter implements DocumentSplitter {
this.tableSplitter = new TableContentSplitter({
chunkSize: this.maxChunkSize,
});
this.jsonSplitter = new JsonContentSplitter({
chunkSize: this.maxChunkSize,
});
}

/**
Expand Down Expand Up @@ -233,7 +238,27 @@ export class SemanticMarkdownSplitter implements DocumentSplitter {
break;
}
case "code": {
splitContent = await this.codeSplitter.split(content.text);
// Detect JSON code blocks
if (/^```json\s*/i.test(content.text)) {
// Remove code block markers for splitting
const jsonBody = content.text
.replace(/^```json\s*/i, "")
.replace(/```\s*$/, "");
// Account for code block wrapper overhead
const wrapperSize = "```json\n".length + "\n```".length; // 9 + 4 = 13
const allowedChunkSize = Math.max(1, this.maxChunkSize - wrapperSize);
// Use a temporary JsonContentSplitter with reduced chunk size
const jsonSplitter = new JsonContentSplitter({
chunkSize: allowedChunkSize,
});
splitContent = await jsonSplitter.split(jsonBody);
// Re-wrap as code blocks
splitContent = splitContent.map((chunk) =>
["```json", chunk, "```"].join("\n"),
);
} else {
splitContent = await this.codeSplitter.split(content.text);
}
break;
}
case "table": {
Expand Down
Loading