result-aggregation
npx machina-cli add skill othmane55/claude-collective-intelligence/result-aggregation --openclawResult Aggregation
Collect, combine, and synthesize results from distributed agent execution.
Quick Start
Basic Result Collection
const results = [];
await client.consumeResults('agent.results', async (msg) => {
results.push(msg.result);
console.log(`Received result ${results.length}`);
});
Wait for All Results
async function collectAllResults(taskIds) {
const results = new Map();
return new Promise((resolve) => {
client.consumeResults('agent.results', async (msg) => {
const { taskId, result } = msg.result;
results.set(taskId, result);
// Check if all results received
if (results.size === taskIds.length) {
resolve(Array.from(results.values()));
}
});
});
}
Aggregation Patterns
Pattern 1: Collect and Merge
Combine all results into single output.
async function collectAndMerge(taskCount) {
const results = [];
await consumeResults('agent.results', async (msg) => {
results.push(msg.result);
if (results.length === taskCount) {
// All results received, merge them
const merged = mergeResults(results);
console.log('Final result:', merged);
}
});
}
function mergeResults(results) {
// Example: Merge test results
return {
totalTests: sum(results.map(r => r.tests)),
passed: sum(results.map(r => r.passed)),
failed: sum(results.map(r => r.failed)),
duration: max(results.map(r => r.duration)),
coverage: average(results.map(r => r.coverage))
};
}
Pattern 2: Map-Reduce
Process results in stages.
// Map phase: Distribute work
const chunks = splitData(largeDataset, 10);
for (const chunk of chunks) {
await assignTask({ type: 'map', data: chunk });
}
// Intermediate collect
const mapResults = await collectResults(chunks.length);
// Reduce phase: Aggregate
const finalResult = reduceResults(mapResults);
function reduceResults(mapResults) {
return mapResults.reduce((acc, result) => {
// Combine results
Object.keys(result).forEach(key => {
acc[key] = (acc[key] || 0) + result[key];
});
return acc;
}, {});
}
Pattern 3: Streaming Aggregation
Process results as they arrive.
let partialResult = initializeResult();
await consumeResults('agent.results', async (msg) => {
// Update partial result immediately
partialResult = updateResult(partialResult, msg.result);
// Display running total
console.log('Current status:', partialResult);
// Publish intermediate results
await publishStatus({
event: 'progress_update',
partial: partialResult
});
});
Pattern 4: Quorum-Based
Wait for majority, not all results.
async function quorumAggregation(totalAgents, quorum = 0.6) {
const results = [];
const requiredResults = Math.ceil(totalAgents * quorum);
await consumeResults('agent.results', async (msg) => {
results.push(msg.result);
if (results.length >= requiredResults) {
// Got quorum, can proceed
const consensusResult = buildConsensus(results);
return consensusResult;
}
});
}
Result Synthesis
Synthesize Brainstorm Responses
function synthesizeBrainstorm(responses) {
// Extract all suggestions
const allSuggestions = responses.flatMap(r =>
r.response.recommendation
);
// Find common themes
const themes = extractThemes(allSuggestions);
// Calculate support for each theme
const themesWithSupport = themes.map(theme => ({
theme,
support: calculateSupport(theme, responses),
agents: findSupportingAgents(theme, responses)
}));
// Sort by support
themesWithSupport.sort((a, b) => b.support - a.support);
// Identify consensus
const consensus = themesWithSupport[0].support > 0.7
? 'STRONG'
: themesWithSupport[0].support > 0.5
? 'MODERATE'
: 'WEAK';
return {
consensus,
majorityView: themesWithSupport[0],
allThemes: themesWithSupport,
conflicts: identifyConflicts(responses),
synthesis: synthesizeRecommendation(themesWithSupport)
};
}
Combine Test Results
function aggregateTestResults(results) {
const combined = {
suites: [],
totals: {
tests: 0,
passed: 0,
failed: 0,
skipped: 0,
duration: 0
},
coverage: {
lines: 0,
functions: 0,
branches: 0,
statements: 0
}
};
for (const result of results) {
// Combine test suites
combined.suites.push(...result.suites);
// Sum totals
combined.totals.tests += result.totals.tests;
combined.totals.passed += result.totals.passed;
combined.totals.failed += result.totals.failed;
combined.totals.skipped += result.totals.skipped;
combined.totals.duration = Math.max(
combined.totals.duration,
result.totals.duration
);
// Average coverage
Object.keys(combined.coverage).forEach(key => {
combined.coverage[key] += result.coverage[key];
});
}
// Calculate average coverage
const count = results.length;
Object.keys(combined.coverage).forEach(key => {
combined.coverage[key] /= count;
});
return combined;
}
Merge Code Analysis Results
function mergeAnalysisResults(results) {
const merged = {
issues: [],
metrics: {},
suggestions: []
};
for (const result of results) {
// Combine issues (deduplicate)
const newIssues = result.issues.filter(issue =>
!merged.issues.some(existing =>
isSameIssue(existing, issue)
)
);
merged.issues.push(...newIssues);
// Merge metrics
Object.keys(result.metrics).forEach(key => {
if (!merged.metrics[key]) {
merged.metrics[key] = [];
}
merged.metrics[key].push(result.metrics[key]);
});
// Combine suggestions (deduplicate)
const newSuggestions = result.suggestions.filter(s =>
!merged.suggestions.includes(s)
);
merged.suggestions.push(...newSuggestions);
}
// Aggregate metrics
Object.keys(merged.metrics).forEach(key => {
const values = merged.metrics[key];
merged.metrics[key] = {
min: Math.min(...values),
max: Math.max(...values),
avg: average(values),
total: sum(values)
};
});
// Prioritize issues and suggestions
merged.issues.sort((a, b) => b.severity - a.severity);
merged.suggestions = dedupAndPrioritize(merged.suggestions);
return merged;
}
Statistical Aggregation
Calculate Statistics
function calculateStatistics(results) {
const values = results.map(r => r.value);
return {
count: values.length,
sum: sum(values),
min: Math.min(...values),
max: Math.max(...values),
mean: average(values),
median: median(values),
mode: mode(values),
stdDev: standardDeviation(values),
variance: variance(values),
percentiles: {
p50: percentile(values, 50),
p75: percentile(values, 75),
p90: percentile(values, 90),
p95: percentile(values, 95),
p99: percentile(values, 99)
}
};
}
Outlier Detection
function detectOutliers(results) {
const stats = calculateStatistics(results);
const outliers = results.filter(r => {
const zScore = Math.abs((r.value - stats.mean) / stats.stdDev);
return zScore > 2; // More than 2 standard deviations
});
return {
outliers,
cleanResults: results.filter(r => !outliers.includes(r)),
stats: calculateStatistics(
results.filter(r => !outliers.includes(r))
)
};
}
Timeout and Partial Results
Handle Incomplete Results
async function collectWithTimeout(taskCount, timeout = 60000) {
const results = [];
const startTime = Date.now();
return new Promise((resolve) => {
const timeoutId = setTimeout(() => {
console.warn(`⏰ Timeout: Only ${results.length}/${taskCount} results received`);
// Return partial results
resolve({
complete: false,
received: results.length,
expected: taskCount,
results: results,
missing: taskCount - results.length
});
}, timeout);
consumeResults('agent.results', async (msg) => {
results.push(msg.result);
if (results.length === taskCount) {
clearTimeout(timeoutId);
resolve({
complete: true,
results: results
});
}
});
});
}
Progressive Disclosure
// Show results as they arrive
async function progressiveAggregation(taskCount) {
const results = [];
let lastUpdate = Date.now();
await consumeResults('agent.results', async (msg) => {
results.push(msg.result);
// Update every 2 seconds or when complete
const now = Date.now();
if (now - lastUpdate > 2000 || results.length === taskCount) {
lastUpdate = now;
const partial = aggregatePartial(results);
console.log(`Progress: ${results.length}/${taskCount}`);
console.log('Current results:', partial);
// Publish progress
await publishStatus({
event: 'aggregation_progress',
progress: results.length / taskCount,
partial
});
}
if (results.length === taskCount) {
const final = aggregateFinal(results);
console.log('Final results:', final);
}
});
}
Weighted Aggregation
Weight by Confidence
function weightedAggregation(responses) {
// Weight each response by its confidence score
const totalConfidence = sum(responses.map(r => r.confidence));
const weighted = responses.map(r => ({
...r,
weight: r.confidence / totalConfidence
}));
// Calculate weighted average recommendation
const recommendation = buildWeightedRecommendation(weighted);
return {
recommendation,
confidence: average(responses.map(r => r.confidence)),
responses: weighted
};
}
Weight by Agent Expertise
function expertiseWeightedAggregation(responses, agentExpertise) {
const weighted = responses.map(r => {
const expertise = agentExpertise[r.from] || 0.5;
return {
...r,
weight: expertise
};
});
return buildWeightedRecommendation(weighted);
}
Deduplication
Deduplicate Results
function deduplicateResults(results) {
const seen = new Set();
const unique = [];
for (const result of results) {
const key = generateResultKey(result);
if (!seen.has(key)) {
seen.add(key);
unique.push(result);
}
}
return unique;
}
function generateResultKey(result) {
// Create unique key based on result content
const normalized = JSON.stringify(result, Object.keys(result).sort());
return hash(normalized);
}
Error Handling
Handle Failed Results
async function aggregateWithErrorHandling(taskIds) {
const results = [];
const errors = [];
await consumeResults('agent.results', async (msg) => {
const { result } = msg;
if (result.status === 'error') {
errors.push(result);
console.error(`Task ${result.taskId} failed:`, result.error);
} else {
results.push(result);
}
if (results.length + errors.length === taskIds.length) {
// All tasks complete (success or failure)
const aggregated = aggregateSuccessfulResults(results);
return {
success: results.length,
failed: errors.length,
results: aggregated,
errors: errors
};
}
});
}
Best Practices
- Set Timeouts: Don't wait indefinitely for results
- Handle Partial Results: Gracefully handle incomplete data
- Deduplicate: Remove duplicate results
- Validate: Check result integrity before aggregating
- Progressive Updates: Show results as they arrive
- Error Handling: Account for failed tasks
- Weighted Aggregation: Consider confidence/expertise
- Single Consumer Pattern: Only ONE consumer type per queue (see below)
Critical Architecture Warning
Result Queue Consumer Conflict (December 7, 2025)
The agent.results queue should have a SINGLE consumer pattern:
PROBLEM: Multiple consumer types on same queue
+-------------------+
| agent.results |
+-------------------+
/ \
v v
Leader Worker
(task results) (brainstorm)
CONFLICT! Messages go to random consumer!
Solution Pattern:
CORRECT: Separate queues for separate purposes
+-------------------+ +---------------------------+
| agent.results | | agent.brainstorm.results |
+-------------------+ +---------------------------+
| |
v v
Leader Workers
(task results) (brainstorm responses)
Before aggregating results, verify:
- Only intended consumers listen to the queue
- No competing consumers for same message type
- Message routing is deterministic
See: docs/lessons/LESSONS_LEARNED.md for full analysis
Examples
See examples/aggregation/:
map-reduce.js- Map-reduce patternstreaming.js- Streaming aggregationquorum.js- Quorum-based collectionweighted.js- Confidence-weighted synthesistimeout.js- Handling timeouts and partial results
Source
git clone https://github.com/othmane55/claude-collective-intelligence/blob/main/skills/result-aggregation/SKILL.mdView on GitHub Overview
Result aggregation collects, combines, and synthesizes results from distributed agent execution into a single, cohesive output. It’s ideal for unifying parallel runs, brainstorm sessions, or map-reduce workflows.
How This Skill Works
Results are collected from distributed agents via a shared channel (e.g., agent.results). Depending on the use case, you either wait for all results, wait for quorum, stream updates, or perform a map-reduce style multi-stage aggregation. A final synthesis step merges or reduces the collected results into a single output (e.g., mergeResults or reduceResults).
When to Use It
- When you need a single report from parallel agent executions
- When synthesizing brainstorm responses to identify themes and consensus
- When processing large datasets with map and reduce stages
- When you want real-time progress by streaming results as they arrive
- When a quorum-based majority decision suffices and some results may be delayed
Quick Start
- Step 1: Set up a collector and push incoming results into a collection (array or map)
- Step 2: Choose a completion condition (all results or quorum) and resolve when met
- Step 3: Apply a final aggregation function (mergeResults or reduceResults) to produce the final output
Best Practices
- Choose the aggregation pattern (all, quorum, map-reduce, streaming) based on task requirements
- Define a stable result schema (predictable fields in msg.result) to simplify merging
- Handle partial results, timeouts, and failures gracefully with sensible fallbacks
- Validate and normalize results before synthesis to prevent skew or errors
- Log received results and publish intermediate progress for observability
Example Use Cases
- Collect and merge unit test results from parallel runners into a single report (total, passed, failed, duration, coverage)
- Synthesize brainstorm ideas by extracting recommendations and identifying common themes
- Partition a dataset into chunks, map results, then reduce to final metrics or summary
- Stream live status updates as results arrive and publish intermediate progress
- Use quorum-based aggregation to reach a decision when some results are slow or missing