Get the FREE Ultimate OpenClaw Setup Guide →

agent-factory

Flagged

{"isSafe":false,"isSuspicious":true,"riskLevel":"medium","findings":[{"category":"prompt_injection","severity":"high","description":"Task-instruction content is injected into the worker's skill prompt via direct string substitution without sanitization, enabling potential prompt-injection that could override safety constraints or cause unsafe behavior in worker executions.","evidence":"worker_skill = worker_skill.replace('{task_instructions}', task['task_instructions'])"}],"summary":"The skill content is largely safe in static form, but it dynamically inserts external task_instructions into the generated worker prompt without validation. This creates a prompt-injection risk where malicious or oversized instructions could override guard rails or steer the worker toward unsafe actions. No direct shell commands, data exfiltration, or external malicious URLs are present in the static content. Mitigate by validating/sanitizing task_instructions, constraining length, and escaping substitutions before embedding into prompts."}

npx machina-cli add skill Ibrahim-3d/conductor-orchestrator-superpowers/agent-factory --openclaw
Files (1)
SKILL.md
11.1 KB

Agent Factory -- Dynamic Worker Creation

Creates ephemeral worker agents from templates, specializing them based on task type.

Worker Creation Flow

Task from DAG -> Determine Type -> Select Template -> Substitute Placeholders -> Spawn Worker

Template Selection

Task TypeTemplateSpecialization
codecode-worker.template.mdTDD, code patterns, tests
uiui-worker.template.mdDesign system, accessibility
integrationintegration-worker.template.mdAPI contracts, error handling
testtest-worker.template.mdCoverage targets, test patterns
docstask-worker.template.mdBase template
configtask-worker.template.mdBase template

CreateWorkerAgent Procedure

def create_worker_agent(task: dict, track_id: str, message_bus_path: str) -> dict:
    """
    Create a specialized worker agent for a task.

    Args:
        task: Task node from DAG (id, name, type, files, depends_on, acceptance)
        track_id: Current track identifier
        message_bus_path: Path to message bus directory

    Returns:
        dict with worker_id, skill_path, prompt
    """

    # 1. Generate unique worker ID
    timestamp = datetime.utcnow().strftime("%Y%m%d%H%M%S")
    worker_id = f"worker-{task['id']}-{timestamp}"

    # 2. Select template based on task type
    task_type = task.get('type', 'code')
    template_map = {
        'code': 'code-worker.template.md',
        'ui': 'ui-worker.template.md',
        'integration': 'integration-worker.template.md',
        'test': 'test-worker.template.md',
    }
    template_name = template_map.get(task_type, 'task-worker.template.md')
    template_path = f".claude/skills/worker-templates/{template_name}"

    # 3. Read template
    template = read_file(template_path)

    # 4. Prepare substitution values
    substitutions = {
        '{task_id}': task['id'],
        '{task_name}': task['name'],
        '{track_id}': track_id,
        '{phase}': str(task.get('phase', 1)),
        '{files}': format_list(task.get('files', [])),
        '{depends_on}': format_list(task.get('depends_on', [])),
        '{acceptance}': task.get('acceptance', 'Complete the task as specified'),
        '{message_bus_path}': message_bus_path,
        '{timestamp}': timestamp,
        '{worker_id}': worker_id,
        '{unblocks}': format_list(find_unblocked_tasks(task['id'])),
    }

    # 5. Substitute placeholders
    worker_skill = template
    for placeholder, value in substitutions.items():
        worker_skill = worker_skill.replace(placeholder, value)

    # 6. Add task-specific instructions
    if task.get('task_instructions'):
        worker_skill = worker_skill.replace(
            '{task_instructions}',
            task['task_instructions']
        )
    else:
        worker_skill = worker_skill.replace(
            '{task_instructions}',
            f"Implement: {task['name']}\n\nAcceptance: {task.get('acceptance', 'N/A')}"
        )

    # 7. Add base protocol
    base_protocol = read_file(".claude/skills/worker-templates/task-worker.template.md")
    base_protocol_section = extract_section(base_protocol, "## Execution Protocol")
    worker_skill = worker_skill.replace('{base_worker_protocol}', base_protocol_section)

    # 8. Create worker skill directory (ephemeral)
    worker_skill_path = f".claude/skills/workers/{worker_id}/SKILL.md"
    os.makedirs(os.path.dirname(worker_skill_path), exist_ok=True)
    write_file(worker_skill_path, worker_skill)

    # 9. Generate dispatch prompt
    dispatch_prompt = f"""You are worker agent {worker_id}.

Your task: {task['name']} (Task {task['id']})

MESSAGE BUS: {message_bus_path}

Follow your worker skill instructions at: {worker_skill_path}

Protocol:
1. Check dependencies via message bus
2. Acquire file locks before modifying
3. Post progress every 5 min
4. Post TASK_COMPLETE when done

Execute autonomously. Do NOT wait for user input."""

    return {
        'worker_id': worker_id,
        'skill_path': worker_skill_path,
        'prompt': dispatch_prompt,
        'task_id': task['id'],
        'task_type': task_type
    }

Batch Worker Creation

For parallel groups, create all workers at once:

def create_workers_for_parallel_group(
    parallel_group: dict,
    dag: dict,
    track_id: str,
    message_bus_path: str
) -> list:
    """
    Create workers for all tasks in a parallel group.

    Args:
        parallel_group: Parallel group definition (id, tasks, conflict_free)
        dag: Full DAG with all task nodes
        track_id: Current track identifier
        message_bus_path: Path to message bus

    Returns:
        List of worker definitions ready for dispatch
    """

    workers = []

    for task_id in parallel_group['tasks']:
        # Find task in DAG
        task = next((n for n in dag['nodes'] if n['id'] == task_id), None)
        if not task:
            continue

        # Create worker
        worker = create_worker_agent(task, track_id, message_bus_path)

        # Add coordination info if not conflict-free
        if not parallel_group.get('conflict_free', True):
            worker['requires_coordination'] = True
            worker['shared_resources'] = parallel_group.get('shared_resources', [])

        workers.append(worker)

    return workers

Worker Dispatch

Dispatch workers via parallel Task calls:

def dispatch_workers(workers: list) -> list:
    """
    Dispatch multiple workers in parallel using Task tool.

    Returns list of Task call results.
    """

    # Create Task calls for all workers
    task_calls = []
    for worker in workers:
        task_calls.append({
            'subagent_type': 'general-purpose',
            'description': f"Execute {worker['task_id']}: {worker.get('task_name', 'task')}",
            'prompt': worker['prompt'],
            'run_in_background': True  # Run in background for true parallelism
        })

    # Dispatch all at once (Claude Code handles parallel calls)
    results = []
    for call in task_calls:
        result = Task(**call)
        results.append(result)

    return results

Worker Cleanup

After task completion, cleanup worker artifacts:

def cleanup_worker(worker_id: str):
    """
    Remove ephemeral worker skill directory.
    Called by orchestrator after worker reports completion.
    """

    worker_skill_path = f".claude/skills/workers/{worker_id}"

    if os.path.exists(worker_skill_path):
        shutil.rmtree(worker_skill_path)

    # Log cleanup
    print(f"Cleaned up worker: {worker_id}")

Cleanup All Workers

After parallel group completes:

def cleanup_parallel_group_workers(parallel_group_id: str, workers: list):
    """
    Cleanup all workers from a completed parallel group.
    """

    for worker in workers:
        cleanup_worker(worker['worker_id'])

    # Remove workers directory if empty
    workers_dir = ".claude/skills/workers"
    if os.path.exists(workers_dir) and not os.listdir(workers_dir):
        os.rmdir(workers_dir)

Helper Functions

def format_list(items: list) -> str:
    """Format list for template substitution."""
    if not items:
        return "None"
    return "\n".join(f"- {item}" for item in items)


def find_unblocked_tasks(task_id: str, dag: dict) -> list:
    """Find tasks that will be unblocked when task_id completes."""
    unblocked = []
    for node in dag.get('nodes', []):
        if task_id in node.get('depends_on', []):
            # Check if this is the only remaining dependency
            remaining_deps = [d for d in node['depends_on'] if d != task_id]
            if not remaining_deps:
                unblocked.append(node['id'])
    return unblocked


def extract_section(content: str, section_header: str) -> str:
    """Extract a section from markdown content."""
    lines = content.split('\n')
    in_section = False
    section_lines = []

    for line in lines:
        if line.startswith(section_header):
            in_section = True
            continue
        elif in_section and line.startswith('## '):
            break
        elif in_section:
            section_lines.append(line)

    return '\n'.join(section_lines).strip()

Integration with Orchestrator

The orchestrator calls the agent factory during PARALLEL_EXECUTE:

# In conductor-orchestrator

async def execute_parallel_phase(phase: Phase, dag: dict):
    # 1. Get parallel groups for this phase
    parallel_groups = [
        pg for pg in dag.get('parallel_groups', [])
        if all(task_in_phase(t, phase) for t in pg['tasks'])
    ]

    for pg in parallel_groups:
        # 2. Create workers via agent factory
        workers = create_workers_for_parallel_group(
            pg, dag, track_id, message_bus_path
        )

        # 3. Dispatch workers in parallel
        results = dispatch_workers(workers)

        # 4. Monitor message bus for completion
        await wait_for_group_completion(pg, message_bus_path)

        # 5. Cleanup workers
        cleanup_parallel_group_workers(pg['id'], workers)

Worker Lifecycle

+---------------------------------------------------------------+
|                      WORKER LIFECYCLE                          |
|                                                                |
|  1. CREATE                                                     |
|     Agent Factory -> Template -> Substitution -> Skill Dir     |
|                                                                |
|  2. DISPATCH                                                   |
|     Orchestrator -> Task(prompt, run_in_background) -> Worker  |
|                                                                |
|  3. EXECUTE                                                    |
|     Worker -> Check Deps -> Lock Files -> Implement -> Commit  |
|                                                                |
|  4. REPORT                                                     |
|     Worker -> Message Bus -> TASK_COMPLETE/TASK_FAILED         |
|                                                                |
|  5. CLEANUP                                                    |
|     Orchestrator -> cleanup_worker() -> Remove Skill Dir       |
|                                                                |
+---------------------------------------------------------------+

Error Handling

def handle_worker_failure(worker: dict, error: str, message_bus_path: str):
    """
    Handle worker failure gracefully.

    1. Post failure to message bus
    2. Release any held locks
    3. Cleanup worker artifacts
    4. Notify orchestrator
    """

    # Post failure message
    post_message(message_bus_path, "TASK_FAILED", worker['worker_id'], {
        "task_id": worker['task_id'],
        "error": error
    })

    # Release all locks held by this worker
    release_all_locks_for_worker(message_bus_path, worker['worker_id'])

    # Cleanup worker
    cleanup_worker(worker['worker_id'])

Source

git clone https://github.com/Ibrahim-3d/conductor-orchestrator-superpowers/blob/master/skills/agent-factory/SKILL.mdView on GitHub

Overview

Agent Factory dynamically creates ephemeral, task-specific workers from templates. It selects the right template based on task type, substitutes metadata, and spawns a dedicated worker that follows a create → execute → cleanup lifecycle.

How This Skill Works

From a DAG task, it determines the task type, selects a corresponding worker template, substitutes placeholders with task data (ids, files, dependencies, acceptance, etc.), and writes an ephemeral worker skill under .claude/skills/workers/{worker_id}/SKILL.md. A dispatch prompt is generated to initiate the worker's execution within the orchestrator.

When to Use It

  • You need per-type specialized workers (code, ui, integration, test, docs) spawned from templates.
  • Tasks should run in parallel with dedicated, isolated workers.
  • You want dynamic generation of workers based on task metadata and placeholders.
  • A clear worker lifecycle is required: create, execute, then cleanup.
  • Task metadata (files, dependencies, acceptance criteria) must drive worker configuration.

Quick Start

  1. Step 1: Define a task in the DAG (id, name, type, files, depends_on, acceptance) and call create_worker_agent with a track_id and message_bus_path.
  2. Step 2: The factory selects the proper template, substitutes placeholders, and writes an ephemeral worker skill at .claude/skills/workers/{worker_id}/SKILL.md.
  3. Step 3: A dispatch prompt is generated and the worker executes; upon completion, the worker is cleaned up.

Best Practices

  • Define and maintain distinct templates for each task type to reflect current requirements.
  • Keep templates simple, idempotent, and stateless to ensure reliable reuse.
  • Ensure all placeholders in templates are covered by substitutions and inputs are validated.
  • Use meaningful worker IDs and track IDs to enable end-to-end traceability.
  • Test the ephemeral lifecycle and cleanup to prevent orphaned workers.

Example Use Cases

  • A code-type DAG task spawns a code worker (code-worker.template.md) to apply tests and verify code patterns.
  • A ui-type task creates a UI worker (ui-worker.template.md) to validate design system and accessibility.
  • An integration-type task launches an integration worker (integration-worker.template.md) to check API contracts and error handling.
  • A test-type task deploys a test worker (test-worker.template.md) to enforce coverage targets and test patterns.
  • A docs or config task uses the base task-worker.template.md to generate documentation or apply configuration changes.

Frequently Asked Questions

Add this skill to your agents
Sponsor this space

Reach thousands of developers