Wednesday, 15 July 2015

java - Camel aggregation strategy -



java - Camel aggregation strategy -

i parsing csv file, splitting , routing through multiple processors in camel. there 2 endpoints , 1 having erroneous info while other has validated data.

i need suggestion in aggregating data.

let's csv file has 10 records out of 6 reached 1 endpoint while 4 reached another. how can know if 10 has completed file @ each endpoint , move ahead of aggregator. need create 2 files 1 valid info , other corrupt info single file.

lets @ splitter returns.

according documentation on camel 2.2. or older splitter default homecoming lastly split message using illustration lastly line finish processor might not line 10 (using example).

on camel 2.3 , newer splitter default homecoming original input message i.e. 10 lines. default behavior , dont need code work. when splitter finished default pass message along next end point.

so if using next dsl on camel 2.3 or newer:

<camelcontext trace="false" id="blueprintcontext" xmlns="http://camel.apache.org/schema/blueprint"> <route id="splitexample"> <from uri="timer://mytimer?period=2000"/> <setbody> <simple>a\nb\nc</simple> </setbody> <log message="the message body before splitter contains ${body}"/> <split> <tokenize token="\n"></tokenize> <log message="split line ${body}"/> </split> <log message="the message body after splitter contains ${body}"/> </route> </camelcontext>

the next appear in log:

info message body before splitter contains b c info split line info split line b info split line c info message body after splitter contains b c

as can see camel default combines messages 1 after splitter returns. override behavior need implement own aggregator. create class lets phone call myaggregationstrategy , create class implement aggregationstrategy. used illustration in apache documentation here. illustration aggregate incoming bids , want aggregate highest bid.

private static class myaggregationstrategy implements aggregationstrategy { public exchange aggregate(exchange oldexchange, exchange newexchange) { if (oldexchange == null) { // first time have new exchange wins first round homecoming newexchange; } int oldprice = oldexchange.getin().getbody(integer.class); int newprice = newexchange.getin().getbody(integer.class); // homecoming "winner" has highest cost homecoming newprice > oldprice ? newexchange : oldexchange; } }

after have done tell splitter utilize aggregator doing following:

spring/xml dsl:

<split strategyref="myaggregationstrategy ">

in java:

from("direct:start") // aggregated header id , utilize our own strategy how aggregate .aggregate(new myaggregationstrategy())

hopefully gives plenty insight how splitter works. in case set header value each line indicating if successful or failed utilize client aggregator create new message failed , success grouped 2 lists message body. 1 list failed , 1 list completed line items.

this new aggregated message can sent processor or endpoint farther processing. illustration can take failed list , send route produces file. seda component can help lot here.

java apache-camel

No comments:

Post a Comment