My Works

Java8 based Asynchronous Examples.

Asynchronous literally means not synchronous. We expect to fire and forget a request and somehow expect to be notified when the process is complete. Normally In this model, the association between a request processing thread and client connection is broken. Asynchronous actions normally follow a non-blocking execution therefore the program is allowed to continue processing.

We are going to look at the asynchronous client processing in two different contexts with a REST server. One is using JAX-RS 2.0 with Java8 CompletableFuture and Lambda expressions. In this asynchronous mode the thread is returned to the thread pool before request processing is completed. Request processing is then continued in another thread, called a Worker. Second one using asynchronous non-blocking I/O (NIO) API of Netty .This means one Connection to provide for several calls.This works through listeners and events using low level data transfer and mechanism. The events are notified using Selectors in NIO. Netty has a construct Bootstrap which wraps Channel that we get from a ChannelFactory and also defines the ChannelPipeline thus help in connection. All Netty methods in asynchronous return a ChannelFuture and by adding ChannelFutureListener to ChannelFuture you could probe outcome of the operation.

  1. Create Parent Project with Modules.
  2. Create Module RestServer with Module Dependencies.
  3. Create web.xml for RestServer.
  4. Create PositiveNewsResource and ProgressiveNewsResource.
  5. Create JAXCodeGen.
  6. Create RestServerUtility.
  7. Generate Objects.
  8. Create Integration tests for Resources and JAXCodeGen.
  9. Create Module AsyncClient with Module Dependencies.
  10. Create LoggingFilter Class.
  11. Create CompleteFutureClientCall Class.
  12. Create Integration tests for AsyncClient.
  13. Create Module NettyHttpClient with Module Dependencies.
  14. Create interface NewsChannelMessages.
  15. Create NewsChannelMessagesImpl.
  16. Create NettyClientHandler.
  17. Create NettyClientInitializer.
  18. Create NettyClient Class.
  19. Create Integration tests for NettyClient.
  20. Run the Junit Tests and Check the data.
  21. Download the Example.

1.Create Parent Project with Modules.

Create a project folder Java8AsynchronousExample containing a parent POM. Add RestServer, AsyncClient and NettyHttpClient as modules in the Parent POM. Please Refer this link for some more information on Multi Module Project.



<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>
<groupId>AsynchronousExample</groupId>
<artifactId>Java8AsynchronousExample</artifactId>
<version>1.0</version>
<packaging>pom</packaging>

<modules>
<module>RestServer</module>
<module>AsyncClient</module>
<module>NettyHttpClient</module>
</modules>

</project>


2.Create Module RestServer with Module Dependencies.

Create a module RestServer with Project dependencies described as JDK 1.8, jersey-bundle.jar , jaxb-xjc.jar, logging jars, junit jar are the ones that would be needed. The pom file dependencies are listed below.


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<parent>
<groupId>AsynchronousExample</groupId>
<artifactId>Java8AsynchronousExample</artifactId>
<version>1.0</version>
</parent>

<modelVersion>4.0.0</modelVersion>
<groupId>org.srinivas.siteworks</groupId>
<artifactId>RestServer</artifactId>
<packaging>war</packaging>

<properties>
<spring.version>3.2.3.RELEASE</spring.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<JAVA_1_8_HOME>C:\Program Files\Java\jdk1.8.0_20</JAVA_1_8_HOME>
</properties>

<repositories>

<!-- spring bundle repository -->
<repository>
<id>com.springsource.repository.bundles.release</id>
<name>SpringSource Enterprise Bundle Repository - SpringSource Bundle Releases</name>
<url>http://repository.springsource.com/maven/bundles/release</url>
</repository>

<repository>
<id>com.springsource.repository.bundles.external</id>
<name>SpringSource Enterprise Bundle Repository - External Bundle Releases</name>
<url>http://repository.springsource.com/maven/bundles/external</url>
</repository>

<!-- jboss Repository -->
<repository>
<id>JBoss</id>
<name>JBoss Repsitory</name>
<layout>default</layout>
<url>http://repository.jboss.org/maven2</url>
</repository>

</repositories>

<dependencies>

<!-- log library slf4j-bridge for commons-logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>1.7.0</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.0</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.0</version>
</dependency>
<!-- end log library slf4j-bridge for commons-logging -->

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>

<!--Jax-RS dependencies -->
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-bundle</artifactId>
<version>1.18.3</version>
</dependency>
<!-- end Jax-RS dependencies -->

<!--Schema to objects dependencies -->
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-xjc</artifactId>
<version>2.2.7</version>
</dependency>
<!--Schema to objects dependencies -->

</dependencies>

<build>
<plugins>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>

<!-- Plugin de Jetty para deploy -->
<plugin>
<groupId>org.mortbay.jetty</groupId>
<artifactId>maven-jetty-plugin</artifactId>
<version>6.1.26</version>
<configuration>
<webApp> ${project.build.directory}/${project.build.finalName}
</webApp>
<scanIntervalSeconds>10</scanIntervalSeconds>
</configuration>
</plugin>

<plugin>
<artifactId>maven-war-plugin</artifactId>
<version>2.1.1</version>
<configuration>
<failOnMissingWebXml>true</failOnMissingWebXml>
<webResources>
<resource>
<directory>src\main\webapp</directory>
</resource>
</webResources>
</configuration>
</plugin>

</plugins>
<finalName>RestServer</finalName>
</build>

<reporting>
<plugins>

<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>jxr-maven-plugin</artifactId>
<version>2.3</version>
</plugin>

<plugin>
<artifactId>maven-surefire-report-plugin</artifactId>
<version>2.16</version>
</plugin>

</plugins>
</reporting>

</project>


3.Create web.xml for RestServer.

Create web.xml in the webapp/WEB-INF folder.Then refer the servlet-class with Jersey ServletContainer class in the web.xml.


<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
version="3.0">
<display-name>RestServer</display-name>

<servlet>
<servlet-name>Jersey Web Application</servlet-name>
<servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
<init-param>
<param-name>com.sun.jersey.config.property.packages</param-name>
<param-value>org.srinivas.siteworks.restserver</param-value>
</init-param>
<load-on-startup>1</load-on-startup>

</servlet>
<servlet-mapping>
<servlet-name>Jersey Web Application</servlet-name>
<url-pattern>/feeds/*</url-pattern>
</servlet-mapping>

</web-app>

4.Create PositiveNewsResource and ProgressiveNewsResource.

In this Example a couple of resources are required for the clients to connect asynchronously.Now Create a class PositiveNewsResource and ProgressiveNewsResource in RestServer project, these classes provide as resource for their own set of News Items.The value of @Path annotation is an expression that denotes a relative URI to the context root of the JAX-RS application .It is the base URI of WAR that browsers and remote clients use. Please Refer this User guide for some more information on Jersey.


@Path("/newssource/positivenews")
public class PositiveNewsResource {

@GET
@Produces(MediaType.APPLICATION_XML)
public String PositiveNews() {
StringWriter stringWriter = new StringWriter();
Item javaItem = new Item();
javaItem.setCategory("World News");
javaItem.setTitle("Java");
javaItem.setDescription("Java Technologies");
javaItem.setLink("http://www.techbrightworks.com/");
javaItem.setPubDate(RestServerUtility.getCurrentDate());
javaItem.setGuid("http://www.techbrightworks.com/");
Item sportsItem = new Item();
sportsItem.setCategory("Sports News");
sportsItem.setTitle("Sports");
sportsItem.setDescription("Cricket World Champion");
sportsItem.setLink("http://www.techbrightworks.com/");
sportsItem.setPubDate(RestServerUtility.getCurrentDate());
sportsItem.setGuid("http://www.techbrightworks.com/");
Item politicsItem = new Item();
politicsItem.setCategory("Politics News");
politicsItem.setTitle("Politics");
politicsItem.setDescription("Party wins Elections");
politicsItem.setLink("http://www.techbrightworks.com/");
politicsItem.setPubDate(RestServerUtility.getCurrentDate());
politicsItem.setGuid("http://www.techbrightworks.com/");
NewsItems newsItems = new NewsItems();
newsItems.getItem().add(javaItem);
newsItems.getItem().add(sportsItem);
newsItems.getItem().add(politicsItem);
NewsChannel newsChannel = new NewsChannel();
newsChannel.setTitle("Positive News");
newsChannel.setNewsItems(newsItems);
RestServerUtility.newsChannelToXML(newsChannel, stringWriter);
return stringWriter.toString();
}
}



@Path("/newssource/progressivenews")
public class ProgressiveNewsResource {

@GET
@Produces(MediaType.APPLICATION_XML)
public String ProgressiveNews() {
StringWriter stringWriter = new StringWriter();
Item javaItem = new Item();
javaItem.setCategory("Tech News");
javaItem.setTitle("Java");
javaItem.setDescription("Java Technologies");
javaItem.setLink("http://www.techbrightworks.com/");
javaItem.setPubDate(RestServerUtility.getCurrentDate());
javaItem.setGuid("http://www.techbrightworks.com/");
Item sportsItem = new Item();
sportsItem.setCategory("Cricket News");
sportsItem.setTitle("Sports");
sportsItem.setDescription("Cricket World Champion");
sportsItem.setLink("http://www.techbrightworks.com/");
sportsItem.setPubDate(RestServerUtility.getCurrentDate());
sportsItem.setGuid("http://www.techbrightworks.com/");
Item politicsItem = new Item();
politicsItem.setCategory("Election News");
politicsItem.setTitle("Politics");
politicsItem.setDescription("Party wins Elections");
politicsItem.setLink("http://www.techbrightworks.com/");
politicsItem.setPubDate(RestServerUtility.getCurrentDate());
politicsItem.setGuid("http://www.techbrightworks.com/");
NewsItems newsItems = new NewsItems();
newsItems.getItem().add(javaItem);
newsItems.getItem().add(sportsItem);
newsItems.getItem().add(politicsItem);
NewsChannel newsChannel = new NewsChannel();
newsChannel.setTitle("Progressive News");
newsChannel.setNewsItems(newsItems);
RestServerUtility.newsChannelToXML(newsChannel, stringWriter);
return stringWriter.toString();
}
}


5.Create JAXCodeGen.

A generic schema NewsSource.xsd for news objects is provided to enable the clients to map to Java objects.To serve this purpose create the class JAXCodeGen and provide a method that generates Java objects based on a schema.


public class JAXCodeGen {

public void generate(String outputDirectory, String packageName, String xsdName) throws IOException, URISyntaxException {
// Setup schema compiler
SchemaCompiler sc = XJC.createSchemaCompiler();
sc.forcePackageName(packageName);
// Setup SAX InputSource
File schemaFile = new File(getClass().getClassLoader().getResource(xsdName).getFile());
InputSource is = new InputSource(schemaFile.toURI().toString());
// Parse and build
sc.parseSchema(is);
S2JJAXBModel model = sc.bind();
JCodeModel jCodeModel = model.generateCode(null, null);
jCodeModel.build(new File(outputDirectory));
}
}


6.Create RestServerUtility.

Create RestServerUtility class which provides the utility methods to convert Java objects to XML as the response from RestServer would be a XML.


public class RestServerUtility {
private static final Logger log = LoggerFactory.getLogger(RestServerUtility.class);

public static void itemToXML(Item item, StringWriter stringWriter) {
try {
JAXBContext jaxbContext = JAXBContext.newInstance(Item.class);
Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
jaxbMarshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
jaxbMarshaller.marshal(item, stringWriter);
} catch (PropertyException e) {
log.info("Error", e);
} catch (JAXBException e) {
log.info("Error", e);
}
}

public static void newsChannelToXML(NewsChannel newsChannel, StringWriter stringWriter) {
try {
JAXBContext jaxbContext = JAXBContext.newInstance(NewsChannel.class);
Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
jaxbMarshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
jaxbMarshaller.marshal(newsChannel, stringWriter);
} catch (PropertyException e) {
log.info("Error", e);
} catch (JAXBException e) {
log.info("Error", e);
}
}

public static XMLGregorianCalendar getCurrentDate() {
GregorianCalendar gcal = (GregorianCalendar) GregorianCalendar.getInstance();
gcal.setTime(new Date());
XMLGregorianCalendar xgcal = null;
try {
xgcal = DatatypeFactory.newInstance().newXMLGregorianCalendar(gcal);
} catch (DatatypeConfigurationException e) {
log.info("Error", e);
}
return xgcal;
}
}


7.Generate Objects.

Now that the NewsSource.xsd and JAXCodeGen is available .The Item , NewsChannel, NewsItems objects could be generated. In the current client modules to make each client module independent the generated Item , NewsChannel and NewsItems objects are added in a package of each of AsyncClient and NettyHttpClient projects. The unit test testJAXCodeGen in the below point can be followed to generate Objects manually.

8.Create Integration tests for Resources and JAXCodeGen.

Create tests for RestServer covering output of resources and Java Objects Generation by JAXCodeGen.The test testJAXCodeGen() covering Java Objects Generation by JAXCodeGen indicates execution required to Generate Objects in the previous Step. Also for the purpose of the regular tests the Objects are generated in temp folder of the directory src/test/java. These files in the temp folder are deleted once unit test assertion is completed within the test testJAXCodeGen().


public class RestServerTest extends TestCase {
private static final String FILE_NAME_STARTS_WITH_NEWS_CHANNEL = "NewsChannel";
private static final String DELETE_FOLDER_TEMP = File.separator +"temp";
private static final String JAXB_CODE_GEN_DIR_PACKAGE_PATH_TEMP_GENERATED = File.separator + "temp" + File.separator + "generated";
private static final String JAXB_CODE_GEN_XSD_NAME_NEWS_SOURCE_XSD = "NewsSource.xsd";
private static final String JAXB_CODE_GEN_PACKAGE_NAME_TEMP_GENERATED = "temp.generated";
private static final String JAXB_CODE_GEN_OUTPUT_DIRECTORY_SRC_TEST_JAVA = "src" + File.separator + "test" + File.separator + "java";
private PositiveNewsResource positiveNewsResource;
private ProgressiveNewsResource progressiveNewsResource;
private static final Logger log = LoggerFactory.getLogger(RestServerTest.class);

public void setUp() {
positiveNewsResource = new PositiveNewsResource();
progressiveNewsResource = new ProgressiveNewsResource();
}

@Test
public void testPositiveNewsResource() throws Exception {
String xmlstring = positiveNewsResource.PositiveNews();
NewsChannel nc = xmlToNewsChannel(xmlstring);
assertEquals(nc.getTitle(), "Positive News");
}

@Test
public void testProgressiveNewsResource() throws Exception {
String xmlstring = progressiveNewsResource.ProgressiveNews();
NewsChannel nc = xmlToNewsChannel(xmlstring);
assertEquals(nc.getTitle(), "Progressive News");
}

@Test
public void testJAXCodeGen() throws Exception {
JAXCodeGen jaxCodeGen = new JAXCodeGen();
jaxCodeGen.generate(JAXB_CODE_GEN_OUTPUT_DIRECTORY_SRC_TEST_JAVA, JAXB_CODE_GEN_PACKAGE_NAME_TEMP_GENERATED, JAXB_CODE_GEN_XSD_NAME_NEWS_SOURCE_XSD);
File dir = new File(new File("").getAbsolutePath() + File.separator + JAXB_CODE_GEN_OUTPUT_DIRECTORY_SRC_TEST_JAVA + JAXB_CODE_GEN_DIR_PACKAGE_PATH_TEMP_GENERATED);
dir.listFiles();
FilenameFilter filter = new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.startsWith(FILE_NAME_STARTS_WITH_NEWS_CHANNEL);
}
};
String[] children = dir.list(filter);
assertTrue(children.length == 1);
deleteTempFile(new File(new File("").getAbsolutePath() + File.separator + JAXB_CODE_GEN_OUTPUT_DIRECTORY_SRC_TEST_JAVA + DELETE_FOLDER_TEMP));
}

private NewsChannel xmlToNewsChannel(String xmlString) {
NewsChannel newsChannel = new NewsChannel();
try {
JAXBContext jaxbContext = JAXBContext.newInstance(NewsChannel.class);
Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
newsChannel = (NewsChannel) jaxbUnmarshaller.unmarshal(new StringReader(xmlString));
} catch (PropertyException e) {
log.info(e.getMessage());
} catch (JAXBException e) {
log.info(e.getMessage());
}
return newsChannel;
}

private void deleteTempFile(File tempFile) {
try {
if (tempFile.isDirectory()) {
File[] entries = tempFile.listFiles();
for (File currentFile : entries) {
deleteTempFile(currentFile);
}
tempFile.delete();
} else {
tempFile.delete();
}
log.info("DELETED Temporal File: " + tempFile.getPath());
} catch (Throwable t) {
log.error("Could not DELETE file: " + tempFile.getPath(), t);
}
}
}


9.Create Module AsyncClient with Module Dependencies.

Create a module AsyncClient with Project dependencies described as JDK 1.8, jersey-client.jar , logging jars, junit jar are the ones that would be needed. The pom file dependencies are listed below.


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<parent>
<groupId>AsynchronousExample</groupId>
<artifactId>Java8AsynchronousExample</artifactId>
<version>1.0</version>
</parent>

<modelVersion>4.0.0</modelVersion>
<groupId>org.srinivas.siteworks</groupId>
<artifactId>AsyncClient</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>AsyncClient</name>

<properties>
<spring.version>3.2.3.RELEASE</spring.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<javac.version>1.8</javac.version>
</properties>

<repositories>

<!-- spring bundle repository -->
<repository>
<id>com.springsource.repository.bundles.release</id>
<name>SpringSource Enterprise Bundle Repository - SpringSource Bundle Releases</name>
<url>http://repository.springsource.com/maven/bundles/release</url>
</repository>

<repository>
<id>com.springsource.repository.bundles.external</id>
<name>SpringSource Enterprise Bundle Repository - External Bundle Releases</name>
<url>http://repository.springsource.com/maven/bundles/external</url>
</repository>

<!-- jboss Repository -->
<repository>
<id>JBoss</id>
<name>JBoss Repsitory</name>
<layout>default</layout>
<url>http://repository.jboss.org/maven2</url>
</repository>

</repositories>

<dependencies>

<!-- log library slf4j-bridge for commons-logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>1.7.0</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.0</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.0</version>
</dependency>
<!-- end log library slf4j-bridge for commons-logging -->

<!-- junit dependency -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
</dependency>
<!-- end junit dependency -->

<!-- JAX RS jersey Client dependency -->
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
<version>2.5</version>
</dependency>
<!-- end JAX RS jersey Client dependency -->

</dependencies>

<build>
<plugins>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.6</version>
</plugin>

<!-- Integrative Tests dependencies -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.2</version>
<executions>
<execution>
<id>copy</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.srinivas.siteworks</groupId>
<artifactId>RestServer</artifactId>
<version>1.0</version>
<type>war</type>
<overWrite>true</overWrite>
<outputDirectory>target/webapps</outputDirectory>
<destFileName>RestServer.war</destFileName>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.mortbay.jetty</groupId>
<artifactId>maven-jetty-plugin</artifactId>
<version>6.1.26</version>
<configuration>
<scanIntervalSeconds>10</scanIntervalSeconds>
<stopKey>foo</stopKey>
<stopPort>9999</stopPort>
<contextPath>/RestServer</contextPath>
<webApp>
${basedir}/target/webapps/RestServer.war
</webApp>
</configuration>
<executions>
<execution>
<id>start-jetty</id>
<phase>pre-integration-test</phase>
<goals>
<goal>deploy-war</goal>
</goals>
<configuration>
<scanIntervalSeconds>0</scanIntervalSeconds>
<daemon>true</daemon>
</configuration>
</execution>
<execution>
<id>stop-jetty</id>
<phase>post-integration-test</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>


<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.16</version>
<configuration>
<excludes>
<exclude>**/*$*</exclude>
<exclude>**/*CompleteFutureClientCallTest.java</exclude>
</excludes>
</configuration>
</plugin>

<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.6</version>
<configuration>
<encoding>UTF-8</encoding>
<includes>
<include>**/CompleteFutureClientCallTest.java</include>
</includes>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- End Integrative Tests dependencies -->

</plugins>
<finalName>AsyncClient</finalName>
</build>

<reporting>
<plugins>

<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>jxr-maven-plugin</artifactId>
<version>2.3</version>
</plugin>

<plugin>
<artifactId>maven-surefire-report-plugin</artifactId>
<version>2.16</version>
</plugin>

</plugins>
</reporting>

</project>


10.Create LoggingFilter Class.

Create Class LoggingFilter.This class helps in logging the requests and responses sent by the client.This enables us to Debug the calls. LoggingFilter implements ClientRequestFilter, ClientResponseFilter.Please Refer this Documentation for some more information on Jersey Filters.


public class LoggingFilter implements ClientRequestFilter, ClientResponseFilter {
private static final String STRING_THREAD_NAME = ":Thread Name ";
private static final String STRING_CLIENT_RESPONSE = "Client Response";
private static final String STRING_CLIENT_REQUEST = "Client request";
private static final int ENTITY_SIZE_0 = 0;
private static final Logger logger = Logger.getLogger(LoggingFilter.class.getName());
private static final int MAXIMUM_ENTITY_SIZE = 8 * 1024;

@Override
public void filter(final ClientRequestContext context) throws IOException {
final StringBuilder requestBuilderLog = new StringBuilder();
requestLInePrint(requestBuilderLog, STRING_CLIENT_REQUEST, context.getMethod(), context.getUri());
log(requestBuilderLog);
}

@Override
public void filter(final ClientRequestContext requestContext, final ClientResponseContext responseContext) throws IOException {
final StringBuilder responseBuilderLog = new StringBuilder();
responseLinePrint(responseBuilderLog, STRING_CLIENT_RESPONSE, responseContext.getStatus());
if (responseContext.hasEntity()) {
responseContext.setEntityStream(logInputStream(responseBuilderLog, responseContext.getEntityStream(), Charset.forName("UTF-8")));
}
log(responseBuilderLog);
}

private void log(final StringBuilder builder) {
if (logger != null) {
logger.info(builder.toString());
}
}

private void requestLInePrint(final StringBuilder builder, final String note, final String method, final URI uri) {
builder.append(note).append(STRING_THREAD_NAME).append(Thread.currentThread().getName()).append("\n");
builder.append(method).append(" ").append(uri.toASCIIString()).append("\n");
}

private void responseLinePrint(final StringBuilder builder, final String note, final int status) {
builder.append(note).append(STRING_THREAD_NAME).append(Thread.currentThread().getName()).append("\n");
builder.append(Integer.toString(status)).append("\n");
}

private InputStream logInputStream(final StringBuilder builder, InputStream stream, final Charset charset) throws IOException {
if (!stream.markSupported()) {
stream = new BufferedInputStream(stream);
}
stream.mark(MAXIMUM_ENTITY_SIZE + 1);
final byte[] entity = new byte[MAXIMUM_ENTITY_SIZE + 1];
final int entitySize = stream.read(entity);
builder.append(new String(entity, ENTITY_SIZE_0, Math.min(entitySize, MAXIMUM_ENTITY_SIZE), charset));
if (entitySize > MAXIMUM_ENTITY_SIZE) {
builder.append(" Entity Size Greater");
}
builder.append('\n');
stream.reset();
return stream;
}
}


11.Create CompleteFutureClientCall Class.

Create Class CompleteFutureClientCall in the AsyncClient Project that would provide for Asynchronous client based on java8 CompletableFuture and Lambda expressions. Since the Above provided RestServer does not do Asynchronous Server Side Processing of response.The execution has to check for all responses have been received and provide for call trials.The Results of the call would indicate that Progressive News and Positive News were picked by two different threads. CompletableFuture provides a worker thread to each source and Populates the same CompletableFuture<NewsChannel> newsChannels Object for both the NewsSources.Please Refer this documentation for some more information on CompletableFuture.


public class CompleteFutureClientCall {
public static final int CALLTRIALS_COUNT_START_VALUE_0 = 0;
private static final int MAX_CALL_TRIALS_ALLOWED_10 = 10;
private static final Logger log = LoggerFactory.getLogger(CompleteFutureClientCall.class);
private Client client;
final Predicate<Item> conditionJava = new Predicate<Item>() {
@Override
public boolean test(Item item) {
return item.getTitle().contains("Java");
}
};

public CompleteFutureClientCall() {
client = ClientBuilder.newClient();
client.register(LoggingFilter.class);
}

public List<NewsChannel> executeCompleteFutureClientCall(String[] newsSources) throws InterruptedException, ExecutionException {
List<NewsChannel> newsChannelList = executeNewsSources(newsSources);
return handleUnansweredCall(newsSources, newsChannelList);
}

public List<Item> newsItemFilter(List<Item> items, Predicate<Item> predicate) {
List<Item> result = items.stream().filter(element -> predicate.test(element)).collect(Collectors.toList());
return result;
}

public List<Item> newsChannelItemsFilter(List<NewsChannel> newsChannelsList, Predicate<Item> predicate) {
List<Item> result = new ArrayList<Item>();
newsChannelsList.stream().forEachOrdered(newsSource -> {
Collection<Item> filterResults = newsSource.getNewsItems().getItem().stream().filter(predicate).collect(Collectors.<Item> toList());
result.addAll(filterResults);
});
return result;
}

public Client getClient() {
return client;
}

public void setClient(Client client) {
this.client = client;
}

private List<NewsChannel> executeNewsSources(String[] newsSources) {
List<NewsChannel> newsChannelsList = new ArrayList<NewsChannel>();
CompletableFuture<NewsChannel> newsChannels = callNewsSources(newsSources, client, newsChannelsList);
while (!newsChannels.isDone()) {
// include any of your processing
}
newsChannels.join();
return newsChannelsList;
}

private CompletableFuture<NewsChannel> callNewsSources(String[] newsSources, Client client, List<NewsChannel> newsChannelsList) {
// Lambda Expression not suitable
CompletableFuture<NewsChannel> newsChannels = null;
for (String source : newsSources) {
newsChannels = executeNewsSourceCall(client, newsChannelsList, source);
}
return newsChannels;
}

private CompletableFuture<NewsChannel> executeNewsSourceCall(Client client, List<NewsChannel> newsChannelsList, String source) {
CompletableFuture<NewsChannel> newsChannels;
newsChannels = CompletableFuture.supplyAsync((Supplier<NewsChannel>) () -> {
log.info("Making a call");
NewsChannel result = null;
String xmlString = client.target(source).request().get(String.class);
result = (null == xmlString || xmlString.isEmpty()) ? null : xmlToNewsChannel(xmlString);
if (result != null) {
newsChannelsList.add(result);
}
return result;
}).exceptionally(e -> {
log.info("Error", e);
return null;
});
return newsChannels;
}

private List<NewsChannel> handleUnansweredCall(String[] newsSources, List<NewsChannel> newsChannelList) {
if (IsAllSourcesNotResponded(newsSources, newsChannelList)) {
int callTrials = CALLTRIALS_COUNT_START_VALUE_0;
newsChannelList = executeTrials(newsSources, newsChannelList, callTrials, MAX_CALL_TRIALS_ALLOWED_10);
return newsChannelList;
} else {
return newsChannelList;
}
}

private boolean IsAllSourcesNotResponded(String[] newsSources, List<NewsChannel> newsChannelsList) {
return newsChannelsList.size() < newsSources.length;
}

private List<NewsChannel> executeTrials(String[] newsSources, List<NewsChannel> newsChannelList, int callTrials, int maximumCallTrialsAllowed) {
// Lambda not suitable
if (callTrials <= MAX_CALL_TRIALS_ALLOWED_10) {
for (; callTrials < MAX_CALL_TRIALS_ALLOWED_10; callTrials++) {
log.info("callTrials " + callTrials);
newsChannelList.clear();
newsChannelList = executeNewsSources(newsSources);
if (!IsAllSourcesNotResponded(newsSources, newsChannelList)) {
callTrials = MAX_CALL_TRIALS_ALLOWED_10;
}
}
}
return newsChannelList;
}

private NewsChannel xmlToNewsChannel(String xmlString) {
NewsChannel newsChannel = new NewsChannel();
try {
JAXBContext jaxbContext = JAXBContext.newInstance(NewsChannel.class);
Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
newsChannel = (NewsChannel) jaxbUnmarshaller.unmarshal(new StringReader(xmlString));
} catch (PropertyException e) {
log.info("Exception", e);
} catch (JAXBException je) {
log.info("Exception", je);
}
return newsChannel;
}
}


12.Create Integration tests for AsyncClient.

Create tests for AsyncClient covering a Basic asynchronous client call and checking reliability of the call by calling it multiple times.


public class CompleteFutureClientCallTest extends TestCase {
private static final String NEWSSOURCE_PROGRESSIVENEWS_URL = "http://localhost:8080/RestServer/feeds/newssource/progressivenews/";
private static final String NEWSSOURCE_POSITIVENEWS_URL = "http://localhost:8080/RestServer/feeds/newssource/positivenews/";
private ThreadLocal<CompleteFutureClientCall> completeFutureClient;
private static final Logger log = LoggerFactory.getLogger(CompleteFutureClientCallTest.class);

public void setUp() {
completeFutureClient = ThreadLocal.withInitial(CompleteFutureClientCall::new);
}

@Test
public void testexecuteCompleteFutureClientCall() throws Exception {
List<NewsChannel> newsChannelList = completeFutureClient.get().executeCompleteFutureClientCall(new String[] { NEWSSOURCE_PROGRESSIVENEWS_URL, NEWSSOURCE_POSITIVENEWS_URL });
assertEquals(2, newsChannelList.size());
}

@Test
public void testNewsChannelFilterTest() throws Exception {
List<NewsChannel> newsChannelList = completeFutureClient.get().executeCompleteFutureClientCall(new String[] { NEWSSOURCE_PROGRESSIVENEWS_URL, NEWSSOURCE_POSITIVENEWS_URL });
assertEquals(2, newsChannelList.size());
List<Item> filteredResults = completeFutureClient.get().newsChannelItemsFilter(newsChannelList, completeFutureClient.get().conditionJava);
assertEquals(filteredResults.size(), 2);
}

@Test
public void testNewsItemTest() throws Exception {
List<NewsChannel> newsChannelList = completeFutureClient.get().executeCompleteFutureClientCall(new String[] { NEWSSOURCE_PROGRESSIVENEWS_URL, NEWSSOURCE_POSITIVENEWS_URL });
assertEquals(2, newsChannelList.size());
List<Item> filteredResults = completeFutureClient.get().newsItemFilter(newsChannelList.get(0).getNewsItems().getItem(), completeFutureClient.get().conditionJava);
assertEquals(1, filteredResults.size());
}

@Test
public void testexecuteCompleteFuturClientCall50() throws Exception {
IntStream.range(0, 50).forEach(testNumber -> {
log.info("CompleteFuturClientCall:" + testNumber);
setUp();
try {
testexecuteCompleteFutureClientCall();
} catch (Exception e) {
log.info("CompleteFuturClientCall50", e);
fail("CompleteFuturClientCall50: " + e.getMessage());
}
});
}
}


13.Create Module NettyHttpClient with Module Dependencies.

Create a module NettyHttpClient with Project dependencies described as JDK 1.8, netty-all.jar , logging jars, junit jar are the ones that would be needed. The pom file dependencies are listed below.


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<parent>
<groupId>AsynchronousExample</groupId>
<artifactId>Java8AsynchronousExample</artifactId>
<version>1.0</version>
</parent>

<modelVersion>4.0.0</modelVersion>
<groupId>org.srinivas.siteworks</groupId>
<artifactId>NettyHttpClient</artifactId>
<version>1.0</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<javac.version>1.8</javac.version>
</properties>

<repositories>

<!-- spring bundle repository -->
<repository>
<id>com.springsource.repository.bundles.release</id>
<name>SpringSource Enterprise Bundle Repository - SpringSource Bundle Releases</name>
<url>http://repository.springsource.com/maven/bundles/release</url>
</repository>

<repository>
<id>com.springsource.repository.bundles.external</id>
<name>SpringSource Enterprise Bundle Repository - External Bundle Releases</name>
<url>http://repository.springsource.com/maven/bundles/external</url>
</repository>

<!-- jboss Repository -->
<repository>
<id>JBoss</id>
<name>JBoss Repsitory</name>
<layout>default</layout>
<url>http://repository.jboss.org/maven2</url>
</repository>

<repository>
<id>opencast-public</id>
<url>http://repository.opencastproject.org/nexus/content/repositories/public/</url>
</repository>

</repositories>

<dependencies>

<!-- log library slf4j-bridge for commons-logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>1.7.0</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.0</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.0</version>
</dependency>
<!-- end log libarary slf4j-bridge for commons-logging -->

<!-- Netty dependencies -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.25.Final</version>
</dependency>
<!-- end Netty dependencies -->

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
</dependency>


</dependencies>

<build>
<plugins>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.6</version>
</plugin>

<!-- Integrative Tests dependencies -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.2</version>
<executions>
<execution>
<id>copy</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.srinivas.siteworks</groupId>
<artifactId>RestServer</artifactId>
<version>1.0</version>
<type>war</type>
<overWrite>true</overWrite>
<outputDirectory>target/webapps</outputDirectory>
<destFileName>RestServer.war</destFileName>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.mortbay.jetty</groupId>
<artifactId>maven-jetty-plugin</artifactId>
<version>6.1.26</version>
<configuration>
<scanIntervalSeconds>10</scanIntervalSeconds>
<stopKey>foo</stopKey>
<stopPort>9999</stopPort>
<contextPath>/RestServer</contextPath>
<webApp>
${basedir}/target/webapps/RestServer.war
</webApp>
</configuration>
<executions>
<execution>
<id>start-jetty</id>
<phase>pre-integration-test</phase>
<goals>
<goal>deploy-war</goal>
</goals>
<configuration>
<scanIntervalSeconds>0</scanIntervalSeconds>
<daemon>true</daemon>
</configuration>
</execution>
<execution>
<id>stop-jetty</id>
<phase>post-integration-test</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.16</version>
<configuration>
<excludes>
<exclude>**/*$*</exclude>
<exclude>**/*NettyClientTest.java</exclude>
</excludes>
</configuration>
</plugin>

<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.6</version>
<configuration>
<encoding>UTF-8</encoding>
<includes>
<include>**/NettyClientTest.java</include>
</includes>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- End Integrative Tests dependencies -->

</plugins>
<finalName>NettyHttpClient</finalName>
</build>

<reporting>
<plugins>

<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>jxr-maven-plugin</artifactId>
<version>2.3</version>
</plugin>

<plugin>
<artifactId>maven-surefire-report-plugin</artifactId>
<version>2.16</version>
</plugin>

</plugins>
</reporting>

</project>


14.Create interface NewsChannelMessages.

Create NewsChannelMessages interface to decouple the setting and getting of the NewsChannel and NettyClientHandler from NettyClient.


public interface NewsChannelMessages {

List<NewsChannel> getNewsChannels();

void setNewsChannels(List<NewsChannel> newsChannels);

void addNewsChannel(NewsChannel newsItem);

NettyClientHandler getNettyClientHandler();

void setNettyClientHandler(NettyClientHandler nettyClientHandler);
}


15.Create NewsChannelMessagesImpl.

Create Class NewsChannelMessagesImpl as an Implementation of interface NewsChannelMessages.


public class NewsChannelMessagesImpl implements NewsChannelMessages {
private List<NewsChannel> newsChannels = new ArrayList<NewsChannel>();
private NettyClientHandler nettyClientHandler;

@Override
public List<NewsChannel> getNewsChannels() {
return newsChannels;
}

@Override
public void setNewsChannels(List<NewsChannel> newsChannels) {
this.newsChannels = newsChannels;
}

@Override
public void addNewsChannel(NewsChannel newsItem) {
getNewsChannels().add(newsItem);
}

public NewsChannelMessagesImpl() {
super();
}

@Override
public NettyClientHandler getNettyClientHandler() {
if(nettyClientHandler == null){
nettyClientHandler =	new NettyClientHandler();
}
nettyClientHandler.setNettyClient(this);
return nettyClientHandler;
}

@Override
public void setNettyClientHandler(NettyClientHandler nettyClientHandler) {
this.nettyClientHandler = nettyClientHandler;
}
}


16.Create NettyClientHandler.

Create Class NettyClientHandler. This class extends SimpleChannelInboundHandler whereby helps to read , write responses and errors occurring in a channel. channelRead0 Method reads the Request and Responses thus enabling the NewsChannel Objects to be added to NettyClient.


public class NettyClientHandler extends SimpleChannelInboundHandler<HttpObject> {
private static final String EMPTY_STRING = "";
private boolean readComplete;
private StringBuffer xmlValue = new StringBuffer(NettyClientHandler.EMPTY_STRING);
private static final Logger log = LoggerFactory.getLogger(NettyClientHandler.class);
private NewsChannelMessages nettyClient;

public boolean isReadComplete() {
return readComplete;
}

public void setReadComplete(boolean readComplete) {
this.readComplete = readComplete;
}

@Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;
log.info("STATUS: " + response.getStatus());
if ((response.headers().get(HttpHeaders.Names.CONNECTION) != null) && response.headers().get(HttpHeaders.Names.CONNECTION).equals(HttpHeaders.Values.CLOSE)) {
setReadComplete(true);
}
}
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
String xmlString = content.content().toString(CharsetUtil.UTF_8);
xmlValue.append(xmlString);
addNewsChannels(xmlString, xmlValue.toString());
System.err.flush();
if (content instanceof LastHttpContent) {
log.info("} END OF CONTENT");
if (isReadComplete()) {
ctx.close();
}
}
}
}

private void addNewsChannels(String xmlString, String xmlValue) {
if (xmlString.trim().contains("<newsChannel>") && xmlString.trim().endsWith("</newsChannel>")) {
NewsChannel result = xmlToNewsChannel(xmlString);
if (result.getNewsItems().getItem().size() > 0) {
getNettyClient().addNewsChannel(result);
log.info("XMLSTRING" + xmlString);
log.info("newschannelsize" + getNettyClient().getNewsChannels().size());
}
setXmlValue(new StringBuffer(EMPTY_STRING));
} else if (xmlValue.trim().contains("<newsChannel>") && xmlValue.trim().endsWith("</newsChannel>")) {
NewsChannel result = xmlToNewsChannel(xmlValue);
if (result.getNewsItems().getItem().size() > 0) {
getNettyClient().addNewsChannel(result);
log.info("XMLVALUE" + xmlValue);
log.info("newschannelsize" + getNettyClient().getNewsChannels().size());
}
setXmlValue(new StringBuffer(EMPTY_STRING));
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}

public NewsChannelMessages getNettyClient() {
return nettyClient;
}

public void setNettyClient(NewsChannelMessages nettyClient) {
this.nettyClient = nettyClient;
}

public StringBuffer getXmlValue() {
return xmlValue;
}

public void setXmlValue(StringBuffer xmlValue) {
this.xmlValue = xmlValue;
}

private NewsChannel xmlToNewsChannel(String xmlString) {
NewsChannel newsChannel = new NewsChannel();
try {
JAXBContext jaxbContext = JAXBContext.newInstance(NewsChannel.class);
Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
newsChannel = (NewsChannel) jaxbUnmarshaller.unmarshal(new StringReader(xmlString));
} catch (PropertyException e) {
log.info(e.getMessage());
} catch (JAXBException e) {
log.info(e.getMessage());
}
return newsChannel;
}
}


17.Create NettyClientInitializer.

Create Class NettyClientInitializer.This class extends ChannelInitializer whereby help in initializing of the Socket Channel Pipeline with Available and Custom Handlers.


public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
private final NettyClientHandler nettyClientHandler;


public NettyClientInitializer(SslContext sslCtx,NettyClientHandler nettyClientHandler) {
this.sslCtx = sslCtx;
this.nettyClientHandler = nettyClientHandler;
}

@Override
public void initChannel(SocketChannel socketChannel) {
ChannelPipeline channelPipeline = socketChannel.pipeline();
// Enable HTTPS if necessary.
if (sslCtx != null) {
channelPipeline.addLast(sslCtx.newHandler(socketChannel.alloc()));
}
channelPipeline.addLast(new HttpClientCodec());
channelPipeline.addLast(nettyClientHandler);
}
}

18.Create NettyClient Class.

Create Class NettyClient in the NettyHttpClient Project. This class provides a client based on asynchronous non-blocking I/O (NIO) API of Netty. All of the NewsSources are executed with the same Connection ChannelFuture. This ChannelFuture is added with ChannelFutureListener for each source.Then writeAndFlush call with the Prepared HttpRequest is made from the ChannelFuture channel. Please Refer these links for some more information Netty WikiPedia , Netty User Guide and Netty Examples

 
public class NettyClient extends NewsChannelMessagesImpl {
private static final int ARRAY_FIRST_ELEMENT_INDEX = 0;
private static final Logger log = LoggerFactory.getLogger(NettyClient.class);
private static ChannelFuture clientChannelFuture;


public void executeNettyClient(String[] newsSources) throws Exception {
URI uri = new URI(newsSources[ARRAY_FIRST_ELEMENT_INDEX]);
String host = uri.getHost();
int port = uri.getPort();
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootStrap = new Bootstrap(); 
try {
makeCall(newsSources, uri, host, port, group, bootStrap);
if (!getClientChannelFuture().channel().closeFuture().await(5000)) {
log.info("The request timed out.");
}
log.info("Done");
} finally {
group.shutdownGracefully();
}
}

private void makeCall(String[] newsSources, URI uri, String host, int port, EventLoopGroup group, Bootstrap bootStrap) throws URISyntaxException {
bootStrap.group(group).channel(NioSocketChannel.class).handler(new NettyClientInitializer(null, getNettyClientHandler()));
// Make the connection attempt.
clientChannelFuture = bootStrap.connect(host, port);
IntStream.range(0, newsSources.length).forEachOrdered(newsSource -> {
boolean isClosingSource = (newsSource == newsSources.length - 1) ? true : false;
try {
makeSourceCall(clientChannelFuture, newsSources[newsSource], isClosingSource);
} catch (Exception e) {
log.info("Error at Make source call", e);
}
});
}


public static ChannelFuture getClientChannelFuture() {
return clientChannelFuture;
}

public static void setClientChannelFuture(ChannelFuture clientChannelFuture) {
NettyClient.clientChannelFuture = clientChannelFuture;
}

private void makeSourceCall(ChannelFuture callChannelfuture, String url, boolean isClosingSource) throws URISyntaxException {
URI uri = new URI(url);
String host = uri.getHost();
callChannelfuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
future.cause().printStackTrace();
return;
}
log.info("Prepare the HTTP request");
HttpRequest callRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath());
callRequest.headers().set(HttpHeaders.Names.HOST, host);
if (isClosingSource) {
callRequest.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
} else {
callRequest.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CONTINUE);
}
callRequest.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
future.channel().writeAndFlush(callRequest);
}
});
}
}


19.Create Integration tests for NettyClient.

Create tests for NettyClient covering a Basic asynchronous client call via the Netty Channel and checking reliability of the call by calling it multiple times.


public class NettyClientTest extends TestCase {
private static final String NEWSSOURCE_PROGRESSIVENEWS_URL = "http://localhost:8080/RestServer/feeds/newssource/progressivenews/";
private static final String NEWSSOURCE_POSITIVENEWS_URL = "http://localhost:8080/RestServer/feeds/newssource/positivenews/";
private ThreadLocal<NettyClient> nettyClient;
private static final Logger log = LoggerFactory.getLogger(NettyClientTest.class);

public void setUp() {
nettyClient = ThreadLocal.withInitial(NettyClient::new);
nettyClient.get().getNewsChannels().clear();
}

@Test
public void testExecuteNettyClient() throws Exception {
nettyClient.get().executeNettyClient(new String[] { NEWSSOURCE_PROGRESSIVENEWS_URL, NEWSSOURCE_POSITIVENEWS_URL });
assertEquals(2, nettyClient.get().getNewsChannels().size());
assertTitle(nettyClient.get().getNewsChannels(), "Progressive News");
assertTitle(nettyClient.get().getNewsChannels(), "Positive News");
}

@Test
public void testExecuteNettyClient50() throws Exception {
IntStream.range(0, 50).forEach(i ->{
log.info("NettyClientCall: "+i);
setUp();
try {
testExecuteNettyClient();
} catch (Exception e) {
log.info("NettyClientCall50", e);
fail("NettyClientCall50: " + e.getMessage());
}
});
}

private void assertTitle(List<NewsChannel> newsChannelList, String title) {
boolean isFound = false;
if(newsChannelList.parallelStream().filter(newsChannel -> newsChannel.getTitle().equals(title)).count() == 1L){
isFound = true;
}
assertTrue(title, isFound);
}
}


20.Run the Junit Tests and Check the data.

Now the setup is ready to execute unit tests and integration tests for the modules AsyncClient,NettyHttpClient and RestServer upon building the parent project Java8AsynchronousExample from the commandline.Go to the path of parent project Java8AsynchronousExample and use the command mvn clean install eclipse:clean eclipse:eclipse -Dwtpversion=2.0.This would build both Parent Project and the Modules within the parent Projects.The RestServer.war is created in the target of RestServer Module which is then used as context to execute the integration tests of both the Modules AsyncClient and NettyHttpClient Client. To execute both the Clients Integration Tests from eclipse, have m2e plugin installed on your eclipse and create a m2 Maven Build configuration with Goal verify integration-test and execute the Integration Tests. Now the parent Project is Build along with Executions of Tests successfully.

Building Projects with Multiple Modules

Building Projects with Multiple Modules

AsyncClient Test Results

AsyncClient Test Results

NettyClient Test Results

NettyClient Test Results

Starting RestServer on AsynClient Module

Starting RestServer on AsynClient Module

Starting RestServer on NettyHttpClient Module

Starting RestServer on AsynClient Module

Build Summary

Build Summary Java8AsynchronousExample

AsyncClient Request Response Logging

AsyncClient Request Response Logging

NettyClient Request Response Logging

NettyClient Request Response Logging

21.Download the Example.