> For the complete documentation index, see [llms.txt](https://host2host.onibonje.com/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://host2host.onibonje.com/docs/08-camel-integration-patterns.md).

# Camel Integration Patterns

## 1. Overview

Apache Camel is the **orchestration engine** of the H2H platform. This document defines standard patterns for route design, cross-cutting concerns, custom components, and database-driven pipeline execution.

## 2. Camel Context Structure

Each runtime pod runs a single Camel context with routes contributed by enabled JAR modules:

```mermaid
flowchart TB
  subgraph context [Camel Context]
    subgraph ingress [Ingress Routes]
      SFTP_R[SFTP Poller]
      API_R[HTTP Listener]
      KAFKA_R[Kafka Consumer]
    end

    subgraph pipeline [Processing Pipeline]
      RESOLVE[Config Resolver]
      EXECUTE[Step Executor]
    end

    subgraph egress [Egress Routes]
      FINACLE_R[Finacle Post]
      ACK_R[ACK Delivery]
      AUDIT_R[Audit Emitter]
    end

    subgraph error [Error Handling]
      RETRY_R[Retry Handler]
      DLQ_R[Dead Letter Queue]
      EXC_R[Exception Queue]
    end

    ingress --> RESOLVE --> EXECUTE
    EXECUTE --> egress
    EXECUTE -->|error| error
  end
```

## 3. Route Design Principles

| Principle               | Rule                                                         |
| ----------------------- | ------------------------------------------------------------ |
| Thin routes             | Routes orchestrate; logic lives in processors and components |
| No hardcoded config     | All partner details from `ConfigResolver`                    |
| Standard error handling | Global `onException` in `h2h-camel-core`                     |
| Correlation propagation | Set `h2h.correlationId` on every exchange                    |
| Idempotent by default   | Every processing route passes through idempotency check      |
| Audit everything        | Audit processor fires on success and failure                 |

## 4. Standard Route Skeleton

### 4.1 File Ingestion Route

```java
@Component
public class FileIngestRoute extends RouteBuilder {

    @Override
    public void configure() {
        from("sftp-dynamic:poll")
            .routeId("file-ingest")
            .process(correlationIdProcessor)
            .process(partnerIdentificationProcessor)
            .process(configResolverProcessor)
            .to("direct:execute-profile");
    }
}
```

### 4.2 Profile Execution Route

```java
from("direct:execute-profile")
    .routeId("execute-profile")
    .process(stepPipelineInitializer)
    .loop(simple("${exchangeProperty.h2h.steps.size}"))
        .process(stepExecutor)
    .end()
    .to("direct:finalize");
```

### 4.3 Finalization Route

```java
from("direct:finalize")
    .routeId("finalize")
    .choice()
        .when(simple("${exchangeProperty.h2h.status} == 'SUCCESS'"))
            .to("ack-nack:generate")
            .to("direct:deliver-ack")
        .when(simple("${exchangeProperty.h2h.status} == 'PARTIAL'"))
            .to("ack-nack:generate-partial")
            .to("direct:deliver-ack")
        .otherwise()
            .to("direct:handle-failure")
    .end();
```

## 5. Cross-Cutting Concerns (h2h-camel-core)

### 5.1 Correlation ID Processor

```java
@Component
public class CorrelationIdProcessor implements Processor {
    @Override
    public void process(Exchange exchange) {
        String correlationId = exchange.getIn().getHeader("X-Correlation-Id", String.class);
        if (correlationId == null) {
            correlationId = UUID.randomUUID().toString();
        }
        exchange.setProperty("h2h.correlationId", correlationId);
        exchange.getIn().setHeader("X-Correlation-Id", correlationId);
        MDC.put("correlationId", correlationId);
    }
}
```

### 5.2 Config Resolver Processor

```java
@Component
public class ConfigResolverProcessor implements Processor {
    private final ConfigResolver configResolver;

    @Override
    public void process(Exchange exchange) {
        String partnerId = exchange.getProperty("h2h.partnerId", String.class);
        String messageType = exchange.getProperty("h2h.messageType", String.class);

        IntegrationProfile profile = configResolver.resolve(
            new ResolveKey(partnerId, messageType, environment)
        );
        exchange.setProperty("h2h.profile", profile);
    }
}
```

### 5.3 Global Error Handler

```java
@Override
public void configure() {
    onException(Exception.class)
        .handled(true)
        .maximumRedeliveries(exchangeProperty("h2h.maxRetries"))
        .redeliveryDelay(5000)
        .useExponentialBackOff()
        .onRedelivery(failureAuditProcessor)
        .process(exceptionClassifierProcessor)
        .choice()
            .when(simple("${exchangeProperty.h2h.retryable} == true"))
                .to("event:publish?eventCode=RETRY_MESSAGE")
            .when(simple("${exchangeProperty.h2h.manualReview} == true"))
                .to("camunda:exception-review")
            .otherwise()
                .to("event:publish?eventCode=DLQ_MESSAGE");
```

Physical Kafka/Rabbit destinations resolve from `event_channel_def` ([46](/docs/46-database-driven-events.md)). .end(); }

````

### 5.4 Idempotency Component

```java
@Component
public class IdempotencyProcessor implements Processor {
    @Override
    public void process(Exchange exchange) {
        String idempotencyKey = buildKey(exchange);
        if (idempotencyStore.exists(idempotencyKey)) {
            exchange.setProperty("h2h.duplicate", true);
            exchange.setProperty("h2h.status", "DUPLICATE");
        } else {
            idempotencyStore.store(idempotencyKey, exchange.getProperty("h2h.correlationId"));
            exchange.setProperty("h2h.duplicate", false);
        }
    }
}
````

### 5.5 Audit Processor

```java
@Component
public class AuditProcessor implements Processor {
    @Override
    public void process(Exchange exchange) {
        AuditEvent event = AuditEvent.builder()
            .correlationId(exchange.getProperty("h2h.correlationId", String.class))
            .partnerId(exchange.getProperty("h2h.partnerId", String.class))
            .step(exchange.getProperty("h2h.currentStep", String.class))
            .status(exchange.getProperty("h2h.status", String.class))
            .timestamp(Instant.now())
            .build();
        eventPublisher.publish("AUDIT_EVENT", event, context);
    }
}
```

## 6. Custom Camel Components

### 6.1 Component Registry

| URI Scheme               | Module            | Purpose                                                  |
| ------------------------ | ----------------- | -------------------------------------------------------- |
| `finacle:postPayment`    | finacle-wrapper   | Post payment via injected `CoreBankingIntegration`       |
| `finacle:accountInquiry` | finacle-wrapper   | Account validation via injected `CoreBankingIntegration` |
| `crypto:pgpDecrypt`      | security          | PGP decryption                                           |
| `crypto:pgpEncrypt`      | security          | PGP encryption                                           |
| `transform:apply`        | transform modules | Apply DB-driven transform spec                           |
| `idempotent:check`       | camel-core        | Duplicate detection                                      |
| `ack-nack:generate`      | ack-nack          | Generate acknowledgement                                 |
| `sftp-dynamic:poll`      | file-adapter      | SFTP with DB-resolved connection                         |
| `sftp-dynamic:deliver`   | file-adapter      | Deliver to partner outbox                                |
| `http-dynamic:serve`     | http-ingress      | REST/SOAP ingress from `channel_config`                  |
| `ws-dynamic:push`        | websocket-ingress | Real-time event push to partner sessions                 |

### 6.2 Dynamic SFTP Component

SFTP connection details are resolved from the integration profile, not hardcoded in the URI:

```java
public class DynamicSftpEndpoint extends ScheduledPollEndpoint {
    @Override
    protected Consumer createConsumer(Processor processor) {
        IntegrationProfile profile = resolveProfile();
        ChannelConfig channel = profile.getChannelConfig();
        String sftpUri = buildSftpUri(channel);  // host from Vault, paths from DB
        return getCamelContext().getEndpoint(sftpUri).createConsumer(processor);
    }
}
```

### 6.3 Transform Component

```java
public class TransformProducer extends DefaultProducer {
    @Override
    public void process(Exchange exchange) {
        IntegrationProfile profile = exchange.getProperty("h2h.profile", IntegrationProfile.class);
        TransformSpec spec = configResolver.getTransform(profile.getTransformInboundId());

        Object result = switch (spec.getEngine()) {
            case "JOLT" -> joltTransformer.transform(exchange.getIn().getBody(), spec.getSpecBody());
            case "JSONATA" -> jsonataTransformer.transform(exchange.getIn().getBody(), spec.getSpecBody());
            case "XSLT" -> xsltTransformer.transform(exchange.getIn().getBody(), spec.getSpecBody());
            default -> throw new UnsupportedTransformEngineException(spec.getEngine());
        };
        exchange.getIn().setBody(result);
    }
}
```

## 7. Step Executor Pattern

The step executor maps database `step_code` values to Spring-registered processors:

```java
@Component
public class StepExecutor implements Processor {
    private final StepRegistry stepRegistry;
    private final AuditProcessor auditProcessor;

    @Override
    public void process(Exchange exchange) {
        List<RouteStep> steps = exchange.getProperty("h2h.steps", List.class);
        int index = exchange.getProperty("h2h.stepIndex", Integer.class);
        RouteStep step = steps.get(index);

        exchange.setProperty("h2h.currentStep", step.getStepCode());

        try {
            Processor processor = stepRegistry.get(step.getStepCode());
            processor.process(exchange);
            auditProcessor.process(exchange);
        } catch (Exception e) {
            exchange.setProperty("h2h.failedStep", step.getStepCode());
            throw e;
        }

        exchange.setProperty("h2h.stepIndex", index + 1);
    }
}
```

### 7.1 Step Registry

```java
@Component
public class StepRegistry {
    private final Map<String, Processor> steps;

    public StepRegistry(
            @Qualifier("pgpDecryptProcessor") Processor pgpDecrypt,
            @Qualifier("fileValidateProcessor") Processor fileValidate,
            @Qualifier("transformProcessor") Processor transform,
            @Qualifier("businessValidateProcessor") Processor businessValidate,
            @Qualifier("idempotencyProcessor") Processor idempotency,
            @Qualifier("finaclePostProcessor") Processor finaclePost,
            @Qualifier("auditProcessor") Processor audit,
            @Qualifier("ackGenerateProcessor") Processor ackGenerate,
            @Qualifier("ackDeliverProcessor") Processor ackDeliver) {
        steps = Map.of(
            "PGP_DECRYPT", pgpDecrypt,
            "FILE_VALIDATE", fileValidate,
            "TRANSFORM_INBOUND", transform,
            "BUSINESS_VALIDATE", businessValidate,
            "IDEMPOTENCY_CHECK", idempotency,
            "FINACLE_POST", finaclePost,
            "EMIT_AUDIT", audit,
            "TRANSFORM_ACK", ackGenerate,
            "DELIVER_ACK", ackDeliver
        );
    }

    public Processor get(String stepCode) {
        Processor processor = steps.get(stepCode);
        if (processor == null) {
            throw new UnknownStepCodeException(stepCode);
        }
        return processor;
    }

    public Set<String> registeredStepCodes() {
        return steps.keySet();
    }
}
```

## 8. Bulk Processing Pattern

For high-volume file processing, use Camel splitting with streaming:

```java
from("direct:process-payment-batch")
    .routeId("bulk-payment-split")
    .split(body(), new PaymentAggregationStrategy())
        .streaming()
        .parallelProcessing()
        .executorService(paymentExecutor)
        .to("direct:process-single-payment")
    .end()
    .to("direct:aggregate-batch-result");
```

| Setting                | Value                    | Rationale                                                      |
| ---------------------- | ------------------------ | -------------------------------------------------------------- |
| `streaming()`          | Enabled                  | Avoid loading entire file into memory                          |
| `parallelProcessing()` | Configurable per partner | Throughput vs Finacle capacity                                 |
| Thread pool size       | From profile config      | DB-driven: `profile.getProcessingConfig().getThreadPoolSize()` |
| Batch size             | From profile config      | DB-driven: `profile.getProcessingConfig().getBatchSize()`      |

## 9. Message Headers Convention

| Header              | Set By                         | Purpose             |
| ------------------- | ------------------------------ | ------------------- |
| `X-Correlation-Id`  | CorrelationIdProcessor         | End-to-end trace    |
| `X-Partner-Id`      | PartnerIdentificationProcessor | Partner routing     |
| `X-Message-Type`    | PartnerIdentificationProcessor | Profile resolution  |
| `X-Batch-Id`        | FileIngestRoute                | Batch tracking      |
| `X-Idempotency-Key` | IdempotencyProcessor           | Duplicate detection |

## 10. Exchange Properties Convention

| Property            | Type               | Purpose                             |
| ------------------- | ------------------ | ----------------------------------- |
| `h2h.correlationId` | String             | Trace ID                            |
| `h2h.partnerId`     | String             | Partner identifier                  |
| `h2h.messageType`   | String             | Message type code                   |
| `h2h.profile`       | IntegrationProfile | Resolved config                     |
| `h2h.steps`         | List\<RouteStep>   | Pipeline steps                      |
| `h2h.stepIndex`     | Integer            | Current step position               |
| `h2h.currentStep`   | String             | Current step code                   |
| `h2h.status`        | String             | SUCCESS, FAILED, PARTIAL, DUPLICATE |
| `h2h.duplicate`     | Boolean            | Idempotency result                  |
| `h2h.retryable`     | Boolean            | Error classification                |
| `h2h.manualReview`  | Boolean            | Needs human intervention            |

## 11. Testing Camel Routes

```java
@CamelSpringBootTest
class BulkPaymentRouteTest {

    @Autowired private CamelContext camelContext;
    @Autowired private ConfigResolver configResolver;

    @MockBean private CoreBankingIntegration coreBanking;

    @Test
    void shouldProcessBulkPaymentFile() throws Exception {
        // Setup mock config
        when(configResolver.resolve(any())).thenReturn(testProfile);

        MockEndpoint finacleMock = camelContext.getEndpoint("mock:finacle", MockEndpoint.class);
        finacleMock.expectedMessageCount(3);

        template.sendBody("direct:execute-profile", testFileContent);

        finacleMock.assertIsSatisfied();
    }
}
```

## 12. Related Documents

* [Design Patterns](/docs/13-design-patterns.md)
* [Modular JAR Architecture](/docs/03-modular-jar-architecture.md)
* [Database-Driven Configuration](/docs/04-database-driven-configuration.md)
* [Technology Stack](/docs/02-technology-stack.md)


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://host2host.onibonje.com/docs/08-camel-integration-patterns.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
