Những người phát triển phần mềm như chúng ta hiểu rõ rằng để có thể tích hợp được 1 hệ thống đầy đủ các loại thành phần chạy thống suốt với nhau là 1 điều không hề dễ dàng. Để các hệ thống IT làm việc và giao tiếp được với nhau, chúng ta cần xây dựng giao thức và nó sẽ làm tiêu tốn khá nhiều thời gian của bạn.

Lý do đó, người ta đưa ra khái niệm chuẩn Enterprise Integration Patterns, giới thiệu, mô tả và hướng dẫn xử lý các vấn đề thường gặp trong việc tích hợp hệ thống. Tuy nhiên, để tuân theo EIP cũng không hề dễ dàng. Chúng ta có một cách khác để tuân theo chuẩn EIP là sử dụng framework tích hợp Apache Camel, với mục tiêu chính là làm cho việc tích hợp này dễ dàng hơn. Camel cung cấp rất nhiều component cho chúng ta sử dụng và sử dụng khái niệm route để thực hiện việc định tuyến luồng dữ liệu khi tích hợp các thành phần lại với nhau. Chúng ta sẽ tìm hiểu dần dần các khái niệm và cách thức sử dụng nó sau đây.

SƠ LƯỢC VỀ CAMEL

  • Camel là 1 framework tích hợp được xây dựng với mục đích hỗ trợ cho việc tích hợp, liên kết các thành phần hệ thống trong dự án của bạn hiệu quả và thoải mái hơn.
  • Camel thực chất là một công cụ định tuyến, cho phép bạn định nghĩa các rule dịnh tuyến, quyết định việc lấy nguồn dữ liệu từ đâu, với điều kiện gì, thực hiện xử lý nó như thế nào và gửi các dữ liệu đó đi đâu.
  • Camel sử dụng ngôn ngữ hỗ trợ tích hợp Java DSL (domain-specific language) dạng Java hoặc XML cho phép bạn định nghĩa được các luật phức tạp, thỏa mãn được hầu hết các yêu cầu business đề ra.

Java DSL

from("file:data/inbox").to("jms:queue:order");

Spring DSL

<route>
    <from uri="file:data/inbox" />
    <to uri="jms:queue:order" />
</route>

 

CÁC THÀNH PHẦN CỦA CAMEL

Routing engine

Nó thực chất là việc chuyển các dữ liệu message ngầm bên dưới.

Route

Là thành phần trong 1 chuỗi các công đoạn xử lý dữ liệu, cho phép chúng ta dễ dàng định nghĩa việc đón nhận, xử lý và trung chuyển dữ liệu.

Ngôn ngữ DSL

Sử dụng để định nghĩa ra 1 route. Ví dụ:

from("file:data/inbox")
        .filter().xpath("/order[not(@test)]")
        .to("jms:queue:order")

hoặc

<route>
    <from uri="file:data/inbox" />
    <filter>
        <xpath>/order[not(@test)]</xpath>
        <to uri="jms:queue:order" />
    </filter>
</route>

Processor

Định nghĩa việc sử dụng, tạo hay sửa đổi dữ liệu mesage đến.

Component

Sử dụng để định nghĩa các loại định tuyến khác nhau như file (FileComponent), ftp (FtpComponent), ..

Hiện tại có hơn 80 loại component có sẵn, xem chi tiết tại: http://camel.apache.org/components.html

Và các thành phần khác như: Endpoint, Producer, Consumer.

 

BẮT ĐẦU VỚI CAMEL

Hãy xem ví dụ đơn giản sau: Cần thiết xây dựng 1 chương trình tự động dò tìm các file có trong thư mục data/inbox. Nếu có, copy chúng qua thư mục data/outbox.

 

Plain Java

public class FileCopier {
    public static void main(String args[]) throws Exception {
        File inboxDirectory = new File("data/inbox");
        File outboxDirectory = new File("data/outbox");
        outboxDirectory.mkdir();
        File[] files = inboxDirectory.listFiles();
        for (File source : files) {
            if (source.isFile()) {
                File dest = new File(outboxDirectory.getPath() + File.separator
                        + source.getName());
                copyFIle(source, dest);
            }
        }
    }

    private static void copyFile(File source, File dest) throws IOException {
        OutputStream out = new FileOutputStream(dest);
        byte[] buffer = new byte[(int) source.length()];
        FileInputStream in = new FileInputStream(source);
        in.read(buffer);
        try {
            out.write(buffer);
        } finally {
            out.close();
            in.close();
        }
    }
}

Apache Camel giải quyết vấn đề này đơn giản như sau:

public class FileCopierWithCamel {
    public static void main(String args[]) throws Exception {
        CamelContext context = new DefaultCamelContext();
        context.addRoutes(new RouteBuilder() {
            public void configure() {
                from("file:data/inbox?noop=true").to("file:data/outbox");
            }
        });
        context.start();
        Thread.sleep(10000);
        context.stop();
    }
}

Tích hợp Camel vào project bằng cách thêm vào pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-core</artifactId>
        <version>${camel-version}</version>
    </dependency>
</dependencies>

 

TÍCH HỢP VỚI ACTIVEMQ

Camel hỗ trợ hoàn hảo trong việc điều hướng các JMS mesage thông qua ConnectionFactory. ActiveMQ là một trong những JMS provider phổ biến hiện nay. Cách tích hơp như sau:

Định nghĩa ConnectionFactory cho ActiveMQ

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");

Khởi tạo CamelContext

CamelContext context = new DefaultCamelContext();
context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

Bổ sung component camel-jms và thư viện activemq vào pom.xml

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-jms</artifactId>
    <version>2.5.0</version>
</dependency>

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.3.2</version>
</dependency>

Sử dụng DSL để gửi message cần gửi sang JMS queue, bằng cách sử dụng từ khóa to. Như thế này, nó sẽ đẩy vào queue có tên là incomingOrders:

...to("jms:queue:incomingOrders")

 

ĐỊNH NGHĨA ROUTE

Thông qua RouteBuilder. Ví dụ sau định nghĩa việc bắt các file có trên FTP server, và chuyển chúng vào JMS queue:

public class FtpToJMSExample {
    public static void main(String args[]) throws Exception {
        CamelContext context = new DefaultCamelContext();
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "vm://localhost");
        context.addComponent("jms",
                JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        context.addRoutes(new RouteBuilder() {
            public void configure() {
                from("ftp://rider.com/orders" + "?username=rider&password=secret")
                    .process(new Processor() {
                            public void process(Exchange exchange)
                                    throws Exception {
                                System.out.println("We just downloaded: "
                                        + exchange.getIn().getHeader(
                                                "CamelFileName"));
                            }
                        })
                    .to("jms:incomingOrders");
            }
        });
        context.start();
        Thread.sleep(10000);
        context.stop();
    }
}

Trong ví dụ trên, từ khóa process dùng để định nghĩa việc xử lý dữ liệu trung gian giữa việc lấy dữ liệu đến (from) và gửi dữ liệu đi (to).

 

TÍCH HỢP VỚISPRING

Thay vì sử dụng Java DSL như các ví dụ trên, ta có thể sử dụng Spring DSL. Ưu điểm của việc sử dụng Spring DSL là toàn bộ việc định nghĩa sẽ giao cho spring quản lý, ta không cần quan tâm gì thêm về việc kích hoạt... 

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
    <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="vm://localhost" />
            </bean>
        </property>
    </bean>
    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <route>
            <from uri="ftp://rider.com/orders?username=rider&password=secret" />
            <to uri="jms:incomingOrders" />
        </route>
    </camelContext>
</beans>

 

CÁC VẤN ĐỀ ROUTING VÀ EIP CƠ BẢN

1. Sử dụng các kiểu content-base router

Sử dụng từ khóa DSL when để switch luồng dữ liệu theo điều kiện

        context.addRoutes(new RouteBuilder() {
            public void configure() {
                from("file:src/data?noop=true").to("jms:incomingOrders");

                from("jms:incomingOrders").choice()
                        .when(header("CamelFileName").endsWith(".xml")).to("jms:xmlOrders")
                        .when(header("CamelFileName").endsWith(".csv")).to("jms:csvOrders");

                from("jms:xmlOrders").process(new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        System.out.println("Received XML order: "
                                + exchange.getIn().getHeader("CamelFileName"));
                    }
                });

                from("jms:csvOrders").process(new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        System.out.println("Received CSV order: "
                                + exchange.getIn().getHeader("CamelFileName"));
                    }
                });
            }
        });

Sử dụng mệnh đề otherwise (như else)

        from("jms:incomingOrders").choice()
                .when(header("CamelFileName").endsWith(".xml")).to("jms:xmlOrders")
                .when(header("CamelFileName").regex("^.*(csv|csl)$")).to("jms:csvOrders")
                .otherwise().to("jms:badOrders");

Thực hiện routing sau content-base router

        from("jms:incomingOrders").choice()
                .when(header("CamelFileName").endsWith(".xml")).to("jms:xmlOrders")
                .when(header("CamelFileName").regex("^.*(csv|csl)$")).to("jms:csvOrders")
                .otherwise().to("jms:badOrders")
        .end()
        .to("jms:continuedProcessing");

Hoặc chặn đứng không tiếp tục routing sau content-base router

        from("jms:incomingOrders").choice()
                .when(header("CamelFileName").endsWith(".xml")).to("jms:xmlOrders")
                .when(header("CamelFileName").regex("^.*(csv|csl)$")).to("jms:csvOrders")
                .otherwise().to("jms:badOrders").stop()
        .end()
        .to("jms:continuedProcessing");

Nếu chuyển từ Java DSL sang implement bằng Sping DSL thì viết như sau:

<route>
    <from uri="jms:incomingOrders" />
    <choice>
        <when>
            <simple>${header.CamelFileName} regex '^.*xml$'</simple>
            <to uri="jms:xmlOrders" />
        </when>
        <when>
            <simple>${header.CamelFileName} regex '^.*(csv|csl)$'</simple>
            <to uri="jms:csvOrders" />
        </when>
        <otherwise>
            <to uri="jms:badOrders" />
            <stop />
        </otherwise>
    </choice>
    <to uri="jms:continuedProcessing" />
</route>

2. Sử dụng mesage filter để lọc bớt dữ liệu đầu vào

Java DSL

        from("jms:xmlOrders")
        .filter(xpath("/order[not(@test)]"))
        .process(new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        System.out.println("Received XML order: "
                                + exchange.getIn().getHeader("CamelFileName"));
                    }
                });

Spring DSL

<route>
    <from uri="jms:xmlOrders" />
    <filter>
        <xpath>/order[not(@test)]</xpath>
        <process ref="orderLogger" />
    </filter>
</route>

3. Multicasting, gửi dữ liệu broadcast đến nhiều đích

Gửi tuần tự

from("jms:xmlOrders").multicast().to("jms:accounting", "jms:production");

Gửi song song

from("jms:xmlOrders").multicast().parallelProcessing().to("jms:accounting", "jms:production");

Gửi song song, có ấn định thread pool size, dừng chế độ multicast nếu gặp exception

ExecutorService executor = Executors.newFixedThreadPool(16);
from("jms:xmlOrders")
.multicast().stopOnException().parallelProcessing().executorService(executor)
.to("jms:accounting", "jms:production");

Chuyển sang Spring DSL như sau

<route>
    <from uri="jms:xmlOrders" />
    <multicast stopOnException="true" parallelProcessing="true"
        executorServiceRef="executor">
        <to uri="jms:accounting" />
        <to uri="jms:production" />
    </multicast>
</route>

<bean id="executor" class="java.util.concurrent.Executors"
    factory-method="newFixedThreadPool">
    <constructor-arg index="0" value="16" />
</bean>

4. Thiết định danh sách đối tượng nhận khi gửi dữ liệu

Java DSL

from("jms:xmlOrders")
.recipientList(header("recipients"));

Spring DSL

<route>
    <from uri="jms:xmlOrders" />
    <recipientList>
        <header>recipients</header>
    </recipientList>
</route>

5. Sử dụng kỹ thuật wiredTap (nge trộm dữ liệu)

Kỹ thuật này nghe khá lạ, nhưng thật sự hữu ích nếu bạn cần trích xuất phần dữ liệu phục vụ cho việc debugging, testing, lưu dữ liệu vào DB trước khi xử lý...

Published rồi mà sợ chỗ này khó hiểu nên viết tiếp. Nhìn ví dụ bên dưới, nếu không có wiredTap thì dữ liệu từ from sẽ đi vào luồng choice (content-base router) như thông lệ. Vậy thì, nếu thêm wiredTap, dữ liệu trước khi vào choice sẽ đi vào jms queue có tên là incomingOrders trước. Ta lại tiếp tục định nghĩa 1 route khác lắng nge from từ jms incomingOrders, tạm tạm là sysout ra để check (phục vụ debugging hoặc testing...). Tức là kiểm tra xem nhận được cái gì đã, ok rồi thì kiểm tra tiếp xem choice có ok không nữa... kiểu là vậy

Java DSL

        from("jms:incomingOrders")
                .wireTap("jms:orderAudit").choice()
                .when(header("CamelFileName").endsWith(".xml")).to("jms:xmlOrders")
                .when(header("CamelFileName").regex("^.*(csv|csl)$")).to("jms:csvOrders")
                .otherwise().to("jms:badOrders");

Spring DSL

<route>
    <from uri="jms:incomingOrders" />
    <wireTap uri="jms:orderAudit" />
    ..
</route>

 

MÃ NGUỒN VÀ TÀI LIỆU THAM KHẢO

Mã nguồn

Tải project từ Github của chính team Camel viết tại: https://github.com/camelinaction/camelinaction

 

Tham khảo

- Camel in Action: http://ait2.iit.uni-miskolc.hu/oktatas/lib/exe/fetch.php?id=tanszek%3Aoktatas%3Ainformacios_rendszerek_integralasa%3Ainformatikai_rendszerek_epitese&cache=cache&media=tanszek:oktatas:informacios_rendszerek_integralasa:camelinaction.pdf


Leave a Comment

Fields with * are required.