Archive

Archive for the ‘BigData’ Category

Sending JSON to NodeJS to Multiple Topics in Kafka.

February 7, 2015 Leave a comment
What we are trying to achieve ?
  1. Send json from and browser/curl to nodejs.
  2. nodejs will redirect json data based on url to each topic in kafka. Example : URL /upload/topic/A will send the json to topic_a in kafka
  3. Further processing is done on kafka.
  4. We can then see the json arrival in kafka, using kafka-console-consumer.sh script.

Step 1 : Get the json_nodejs_multiple_topics.js from git

Step 2 : Start above script on the nodejs server.

 [nodejs-admin@nodejs nodejs]$ vim json_nodejs_multiple_topics.js
[nodejs-admin@nodejs nodejs]$ node json_nodejs_multiple_topics.js

Step 3 : Execute curl command to send the JSON to nodejs.

 [nodejs-admin@nodejs nodejs]$ curl -H "Content-Type: application/json" -d '{"username":"xyz","password":"xyz"}' http://localhost:8125/upload/topic/A
[nodejs-admin@nodejs nodejs]$ curl -H "Content-Type: application/json" -d '{"username":"abc","password":"xyz"}' http://localhost:8125/upload/topic/B
[nodejs-admin@nodejs nodejs]$ curl -H "Content-Type: application/json" -d '{"username":"efg","password":"xyz"}' http://localhost:8125/upload/topic/C
[nodejs-admin@nodejs nodejs]$ curl -H "Content-Type: application/json" -d '{"username":"efg","password":"xyz"}' http://localhost:8125/upload/topic/D

Step 4 : Output on nodejs console

 [nginx-admin@nginx nodejs]$ node json_nodejs_multiple_topics.js 
For Topic A
{"username":"xyz","password":"xyz"}
{ topic_a: { '0': 16 } }
For Topic B
{"username":"abc","password":"xyz"}
{ topic_b: { '0': 1 } }
For Topic C
{"username":"efg","password":"xyz"}
{ topic_c: { '0': 0 } }
ERROR: Could not Process this URL :/upload/topic/D
{"username":"efg","password":"xyz"}
{"username":"xyz","password":"xyz"} request from the curl command.
{ topic_a: { '0': 16 } } response from the kafka cluster that, it has received the json.

Step5 : Output on the kafka consumer side.

NOTE : Assuming that we have already created topics in kafka. using below command.
 [kafka-admin@kafka kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_a
[kafka-admin@kafka kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_b
[kafka-admin@kafka kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_c
[kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
topic_a
topic_b
topic_c
[kafka-admin@kafka kafka_2.9.2-0.8.2.0]$
Here is the output after running curl command on the nodejs server
 [kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic_a --from-beginning
{"username":"xyz","password":"xyz"}

[kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic_b --from-beginning
{"username":"abc","password":"xyz"}

[kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic_c --from-beginning
{"username":"efg","password":"xyz"}
{"username":"xyz","password":"xyz"} data received from nodejs server.
{"username":"abc","password":"xyz"} data received from nodejs server.
{"username":"efg","password":"xyz"} data received from nodejs server.
 http://whatizee.blogspot.in/2015/02/installing-kafka-single-node-quick-start.html
http://whatizee.blogspot.in/2015/02/installing-nodejs-on-centos-66.html
http://whatizee.blogspot.in/2015/02/sending-json-nodejs-kafka.html
https://github.com/zubayr/kafka-nodejs/blob/master/send_json_multiple_topics/README.md
Categories: BigData, HOWTOs

Sending JSON -> NodeJS -> Kafka.

February 6, 2015 Leave a comment
What we are trying to achieve ?
  1. Send json from and browser/curl to nodejs.
  2. nodejs will redirect json data to kafka.
  3. Further processing is done on kafka.
  4. We can then see the json arrive on kafka-console-consumer.sh script.

Step 1 : Create a script called json_nodejs_kafka.js with below script.

/*
Getting some 'http' power
*/
var http=require('http');

/*
Setting where we are expecting the request to arrive.
http://localhost:8125/upload

*/
var request = {
hostname: 'localhost',
port: 8125,
path: '/upload',
method: 'GET'
};

/*
Lets create a server to wait for request.
*/
http.createServer(function(request, response)
{
/*
Making sure we are waiting for a JSON.
*/
response.writeHeader(200, {"Content-Type": "application/json"});

/*
request.on waiting for data to arrive.
*/
request.on('data', function (chunk)
{

/*
CHUNK which we recive from the clients
For out request we are assuming its going to be a JSON data.
We print it here on the console.
*/
console.log(chunk.toString('utf8'))

/*
Using kafka-node - really nice library
create a producer and connect to a Zookeeper to send the payloads.
*/
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client('kafka:2181'),
producer = new Producer(client);

/*
Creating a payload, which takes below information
'topic' --> this is the topic we have created in kafka.
'messages' --> data which needs to be sent to kafka. (JSON in our case)
'partition' --> which partition should we send the request to.
If there are multiple partition, then we optimize the code here,
so that we send request to different partitions.

*/
payloads = [
{ topic: 'test', messages: chunk.toString('utf8'), partition: 0 },
];

/*
producer 'on' ready to send payload to kafka.
*/
producer.on('ready', function(){
producer.send(payloads, function(err, data){
console.log(data)
});
});

/*
if we have some error.
*/
producer.on('error', function(err){})

});
/*
end of request
*/
response.end();

/*
Listen on port 8125
*/
}).listen(8125);

Step 2 : Start above script on the nodejs server.

[nodejs-admin@nodejs nodejs]$ vim json_nodejs_kafka.js
[nodejs-admin@nodejs nodejs]$ node json_nodejs_kafka.js

Step 3 : Execute curl command to send the JSON to nodejs.

[nodejs-admin@nodejs nodejs]$ curl -H "Content-Type: application/json" -d '{"username":"xyz","password":"xyz"}' http://localhost:8125/upload

Step 4 : Output on nodejs console

[nodejs-admin@nodejs nodejs]$ node json_nodejs_kafka.js 
{"username":"xyz","password":"xyz"}
{ test: { '0': 29 } }
{"username":"xyz","password":"xyz"} request from the curl command.
{ test: { '0': 29 } } response from the kafka cluster that, it has received the json.

Step5 : Output on the kafka consumer side.

[kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
{"username":"xyz","password":"xyz"}
{"username":"xyz","password":"xyz"} data received from nodejs server.
Categories: BigData, HOWTOs

NodeJS Kafka Producer – Using `kafka-node`

February 6, 2015 Leave a comment
Now that we have Kafka and NodeJS ready. Lets some data to our Kafka Cluster.
Below is a basic producer code.
below are the Server Details.
  1. nodejs is the nodejs server.
  2. kafka is the kafka server (single node).

Step 1: Copy the below script in a file called producer_nodejs.js.

 /*
Basic producer to send data to kafka from nodejs.
More Information Here : https://www.npmjs.com/package/kafka-node
*/

// Using kafka-node - really nice library
// create a producer and connect to a Zookeeper to send the payloads.
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client('kafka:2181'),
producer = new Producer(client);

/*
Creating a payload, which takes below information
'topic' --> this is the topic we have created in kafka. (test)
'messages' --> data which needs to be sent to kafka. (JSON in our case)
'partition' --> which partition should we send the request to. (default)

example command to create a topic in kafka:
[kafka@kafka kafka]$ bin/kafka-topics.sh \
--create --zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic test

If there are multiple partition, then we optimize the code here,
so that we send request to different partitions.

*/
payloads = [
{ topic: 'test', messages: 'This is the First Message I am sending', partition: 0 },
];


// producer 'on' ready to send payload to kafka.
producer.on('ready', function(){
producer.send(payloads, function(err, data){
console.log(data)
});
});

producer.on('error', function(err){}

Step 2 : Start the kafka cluster as we already did in Installation of Kafka. Assuming topic as test

Step 3 : Start the consumer service as in the below command.

 [kafka-admin@kafka kafka]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

Step 4 : Execute below command. This will send This is the First Message I am sending Message to the Kafka consumer.

 [nodejs-admin@nodejs nodejs]$ node producer_nodejs.js

Step 5 : Check on the consumer you will see the message sent from nodejs.

 [kafka-admin@kafka kafka_2.9.2-0.8.2.0]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message here
This is the First Message I am sending
Categories: BigData, HOWTOs

Installing NodeJS on Centos 6.6.

February 6, 2015 Leave a comment

Installing nodejs and npm on centos is very simple.

 [nodejs-admin@nodejs ~]$ sudo su
[nodejs-admin@nodejs ~]# curl -sL https://rpm.nodesource.com/setup | bash -
[nodejs-admin@nodejs ~]# yum install -y nodejs

Installing gcc-c++ and make.

 [nodejs-admin@nodejs ~]$ sudo yum install gcc-c++ make
[sudo] password for nodejs-admin:
Loaded plugins: fastestmirror, refresh-packagekit, security
Setting up Install Process
Loading mirror speeds from cached hostfile
* base: mirrors.123host.vn
* epel: ftp.cuhk.edu.hk
* extras: centos-hn.viettelidc.com.vn
* updates: mirrors.vonline.vn
Package 1:make-3.81-20.el6.x86_64 already installed and latest version
Resolving Dependencies
...

Complete!

Later on we will need kafka-node lets install that as well.

 [nodejs-admin@nodejs ~]$ sudo npm install kafka-node
[sudo] password for nodejs-admin:

> snappy@3.0.6 install /home/nodejs-admin/node_modules/kafka-node/node_modules/snappy
> node-gyp rebuild

gyp WARN EACCES user "root" does not have permission to access the dev dir "/root/.node-gyp/0.10.36"
gyp WARN EACCES attempting to reinstall using temporary dev dir "/home/nodejs-admin/node_modules/kafka-node/node_modules/snappy/.node-gyp"
make: Entering directory `/home/nodejs-admin/node_modules/kafka-node/node_modules/snappy/build'
CXX(target) Release/obj.target/snappy/deps/snappy/snappy-1.1.2/snappy-sinksource.o
CXX(target) Release/obj.target/snappy/deps/snappy/snappy-1.1.2/snappy-stubs-internal.o
CXX(target) Release/obj.target/snappy/deps/snappy/snappy-1.1.2/snappy.o
AR(target) Release/obj.target/deps/snappy/snappy.a
COPY Release/snappy.a
CXX(target) Release/obj.target/binding/src/binding.o
SOLINK_MODULE(target) Release/obj.target/binding.node
SOLINK_MODULE(target) Release/obj.target/binding.node: Finished
COPY Release/binding.node
make: Leaving directory `/home/nodejs-admin/node_modules/kafka-node/node_modules/snappy/build'
kafka-node@0.2.18 node_modules/kafka-node
├── buffer-crc32@0.2.5
├── retry@0.6.1
├── node-uuid@1.4.1
├── async@0.7.0
├── lodash@2.2.1
├── debug@2.1.1 (ms@0.6.2)
├── binary@0.3.0 (buffers@0.1.1, chainsaw@0.1.0)
├── node-zookeeper-client@0.2.0 (async@0.2.10, underscore@1.4.4)
├── buffermaker@1.2.0 (long@1.1.2)
└── snappy@3.0.6 (bindings@1.1.1, nan@1.5.3)
[nodejs-admin@nodejs ~]$ ls

Lets do a test.

Create a script called example.js with below code.
 var http = require('http');
http.createServer(function (req, res) {
res.writeHead(200, {'Content-Type': 'text/plain'});
res.end('Hello World\n');
}).listen(1337, '127.0.0.1');
console.log('Server running at http://127.0.0.1:1337/');
Lets start the server on a terminal.
 [nodejs-admin@nodejs nodejs]$ node example.js 
Server running at http://127.0.0.1:1337/
Hit the URL from the browser and We can see Hello World.
So we are all set.
NodeJS is Ready.

Lets make some simple changes to exsisting script to handle JSON.

Here is a simple script to handle JSON data.
 //    Getting some 'http' power
var http=require('http');

// Setting where we are expecting the request to arrive.
// http://localhost:8125/upload
var request = {
hostname: 'localhost',
port: 8125,
path: '/upload',
method: 'GET'
};

// Lets create a server to wait for request.
http.createServer(function(request, response)
{
// Making sure we are waiting for a JSON.
response.writeHeader(200, {"Content-Type": "application/json"});

// request.on waiting for data to arrive.
request.on('data', function (chunk)
{
// CHUNK which we recive from the clients
// For out request we are assuming its going to be a JSON data.
// We print it here on the console.
console.log(chunk.toString('utf8'))
});
//end of request
response.end();
// Listen on port 8125
}).listen(8125);
Lets fire up the script.
 [nodejs-admin@nodejs nodejs]$ node node_recv_json.js 
On a new terminal send some request to our script. Our script is listening on 8125 port.
 [nodejs-admin@nodejs nodejs]$ curl -H "Content-Type: application/json" -d '{"username":"xyz","password":"xyz"}' http://localhost:8125/upload
You will see the message received on the script terminal.
 [nodejs-admin@nodejs nodejs]$ node node_recv_json.js 
{"username":"xyz","password":"xyz"}
Now we are all set to do some RND.
Categories: BigData, HOWTOs

Installing KAFKA Single Node – Quick Start.

February 6, 2015 Leave a comment

Download and Extract

Download the tgz file and extract.
 [kafka-admin@kafka Downloads]$ ls
jdk-7u75-linux-x64.rpm kafka_2.9.2-0.8.2.0.tgz
[kafka-admin@kafka Downloads]$ sudo rpm -ivh jdk-7u75-linux-x64.rpm
...
[kafka-admin@kafka Downloads]$ sudo tar -xzf kafka_kafka_2.9.2-0.8.2.0.tgz -C /opt
[kafka-admin@kafka Downloads]$ cd /opt
[kafka-admin@kafka opt]$ sudo ln -s kafka_2.9.2-0.8.2.0 kafka
[kafka-admin@kafka opt]$ ls
kafka kafka_2.9.2-0.8.2.0
[kafka-admin@kafka opt]$ sudo chmod kafka-admin:kafka-admin -R kafka

Now we are ready to start all the services required.

 [kafka-admin@kafka opt]$ cd kafka
[kafka-admin@kafka kafka]$ ls
bin config libs LICENSE logs NOTICE
[kafka-admin@kafka kafka]$ bin/zookeeper-server-start.sh config/zookeeper.properties
This will start us a zookeeper in localhost on port 2181. This configuration can be changed in the config/zookeeper.properties file. NOTE : If you want to run the zookeeper on a separate machine make sure the change in the config/server.properties so that the kafka server points to the correct zookeeper. By default it points to localhost:2181.

Next we start server.

 [kafka-admin@kafka kafka]$ bin/kafka-server-start.sh config/server.properties
NOTE : If you want to start multiple make sure you make multiple copies of the server.properties file and change the below information.
  1. broker.id is the unique identifier for the service.
  2. port where this server is going to listen on.
  3. log.dir where to write the log.
 config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
Now our server has started, lets assume we start only one server.

Creating Topics

To create a topic just execute below command, this will create a single partition.
 [kafka-admin@kafka kafka]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
To check topics currently running. Execute below command.
 [kafka-admin@kafka kafka]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test
[kafka-admin@kafka kafka]$
We see currently we have only one topic. Now we are all set to send and recv messages.

Send some message

Open up a new terminal and fire up the Kafka producer script as below. And start typing some message \n or cr will be end of each message
 [kafka-admin@kafka kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is a message2

Start a Consumer

Open a new terminal and start the consumer.
Option --from-beginning will give all the messages from the beginning. You will see 2 messages as we typed above This is a message and This is a message2.
 [kafka-admin@kafka kafka]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is a message2
Our single node Kafka cluster is Ready.
Categories: BigData, HOWTOs

Welcome to AWS Cloud

February 25, 2013 Leave a comment

Big data technologies let you work with any velocity, volume, or variety of data in a highly productive environment. This session seeks to answer questions such as “what is big data,” “how can I use unstructured data,” and “how can I integrate data collections from different sources” using Hadoop with Amazon Elastic MapReduce.

FTP Client Using Apache Common net library using Socks Proxy Server

August 2, 2012 Leave a comment
As we have the FTP Server code in previous post, here is a FTP Client using Apache Commons library.  Below is a basic code which we can use to connect to our FTP server which we created over a SOCKS Proxy Server. (Assuming you have a SOCKS proxy server running already – if not then code can be used just as a FTP Client to connect directly to FTP Server).

Below Image shows, libraries required and parameter passed from the build.xml (this can be done over the command line as well)

1

Below Image shows changes required to Route all Packets to go through Proxy Server

2

Libraries Required.

       
       
       
   

Sample Arg in Build.xml File

   
       
                <!-- -->
               
       
   

Code Below : FTP Client with minor changes to Example Code found on Apache Common net Website.

package com.ftp.client;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;

import org.apache.commons.net.PrintCommandListener;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPHTTPClient;
import org.apache.commons.net.ftp.FTPClientConfig;
import org.apache.commons.net.ftp.FTPConnectionClosedException;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.commons.net.ftp.FTPSClient;
import org.apache.commons.net.io.CopyStreamEvent;
import org.apache.commons.net.io.CopyStreamListener;
import org.apache.commons.net.util.TrustManagerUtils;

public final class FtpClient {

    public static final String USAGE = "Usage: ftp [options] [ []]\n"
            + "\nDefault behavior is to download a file and use ASCII transfer mode.\n"
            + "\t-a - use local active mode (default is local passive)\n"
            + "\t-b - use binary transfer mode\n"
            + "\t-c cmd - issue arbitrary command (remote is used as a parameter if provided) \n"
            + "\t-d - list directory details using MLSD (remote is used as the pathname if provided)\n"
            + "\t-e - use EPSV with IPv4 (default false)\n"
            + "\t-f - issue FEAT command (remote and local files are ignored)\n"
            + "\t-h - list hidden files (applies to -l and -n only)\n"
            + "\t-k secs - use keep-alive timer (setControlKeepAliveTimeout)\n"
            + "\t-l - list files using LIST (remote is used as the pathname if provided)\n"
            + "\t-L - use lenient future dates (server dates may be up to 1 day into future)\n"
            + "\t-n - list file names using NLST (remote is used as the pathname if provided)\n"
            + "\t-p true|false|protocol[,true|false] - use FTPSClient with the specified protocol and/or isImplicit setting\n"
            + "\t-s - store file on server (upload)\n"
            + "\t-t - list file details using MLST (remote is used as the pathname if provided)\n"
            + "\t-w msec - wait time for keep-alive reply (setControlKeepAliveReplyTimeout)\n"
            + "\t-T  all|valid|none - use one of the built-in TrustManager implementations (none = JVM default)\n"
            + "\t-PrH server[:port] - SOCKS Proxy host and optional port[1080] \n"
            + "\t-PrU user - SOCKS Proxy server username\n"
            + "\t-PrP password - SOCKS Proxy server password\n"
            + "\t-# - add hash display during transfers\n";

    public static final void main(String[] args) {
        boolean storeFile = false, binaryTransfer = false, error = false, listFiles = false, listNames = false, hidden = false;
        boolean localActive = false, useEpsvWithIPv4 = false, feat = false, printHash = false;
        boolean mlst = false, mlsd = false;
        boolean lenient = false;
        long keepAliveTimeout = -1;
        int controlKeepAliveReplyTimeout = -1;
        int minParams = 5; // listings require 3 params
        String protocol = null; // SSL protocol
        String doCommand = null;
        String trustmgr = null;
        String proxyHost = null;
        String proxyPort = "1080";
        String proxyUser = null;
        String proxyPassword = null;

        int base = 0;
        for (base = 0; base < args.length; base++) {
            if (args[base].equals("-s")) {
                storeFile = true;
            } else if (args[base].equals("-a")) {
                localActive = true;
            } else if (args[base].equals("-b")) {
                binaryTransfer = true;
            } else if (args[base].equals("-c")) {
                doCommand = args[++base];
                minParams = 3;
            } else if (args[base].equals("-d")) {
                mlsd = true;
                minParams = 3;
            } else if (args[base].equals("-e")) {
                useEpsvWithIPv4 = true;
            } else if (args[base].equals("-f")) {
                feat = true;
                minParams = 3;
            } else if (args[base].equals("-h")) {
                hidden = true;
            } else if (args[base].equals("-k")) {
                keepAliveTimeout = Long.parseLong(args[++base]);
            } else if (args[base].equals("-l")) {
                listFiles = true;
                minParams = 3;
            } else if (args[base].equals("-L")) {
                lenient = true;
            } else if (args[base].equals("-n")) {
                listNames = true;
                minParams = 3;
            } else if (args[base].equals("-p")) {
                protocol = args[++base];
            } else if (args[base].equals("-t")) {
                mlst = true;
                minParams = 3;
            } else if (args[base].equals("-w")) {
                controlKeepAliveReplyTimeout = Integer.parseInt(args[++base]);
            } else if (args[base].equals("-T")) {
                trustmgr = args[++base];
            } else if (args[base].equals("-PrH")) {
                proxyHost = args[++base];
                String parts[] = proxyHost.split(":");
                if (parts.length == 2) {
                    proxyHost = parts[0];
                    proxyPort = parts[1];
                    System.out.println("Proxy Host : " + proxyHost);
                    System.out.println("Proxy Port : " + proxyPort);
                }
            } else if (args[base].equals("-PrU")) {
                proxyUser = args[++base];
            } else if (args[base].equals("-PrP")) {
                proxyPassword = args[++base];
            } else if (args[base].equals("-#")) {
                printHash = true;
            } else {
                break;
            }
        }

        int remain = args.length - base;
        if (remain < minParams) // server, user, pass, remote, local [protocol]
        {
            System.err.println(USAGE);
            System.exit(1);
        }

        String server = args[base++];
        int port = 0;
        String parts[] = server.split(":");
        if (parts.length == 2) {
            server = parts[0];
            port = Integer.parseInt(parts[1]);
        }
        String username = args[base++];
        String password = args[base++];

        String remote = null;
        if (args.length - base > 0) {
            remote = args[base++];
        }

        String local = null;
        if (args.length - base > 0) {
            local = args[base++];
        }

        final FTPClient ftp;
        if (protocol == null) {
            if (proxyHost != null) {
                System.out.println("Using SOCKS proxy server: " + proxyHost);
                System.setProperty("socksProxyHost", proxyHost);
                System.setProperty("socksProxyPort", proxyPort);
                ftp = new FTPClient();

            } else {
                ftp = new FTPClient();
            }
        } else {
            FTPSClient ftps;
            if (protocol.equals("true")) {
                ftps = new FTPSClient(true);
            } else if (protocol.equals("false")) {
                ftps = new FTPSClient(false);
            } else {
                String prot[] = protocol.split(",");
                if (prot.length == 1) { // Just protocol
                    ftps = new FTPSClient(protocol);
                } else { // protocol,true|false
                    ftps = new FTPSClient(prot[0],
                            Boolean.parseBoolean(prot[1]));
                }
            }
            ftp = ftps;
            System.out.println(" Secure Connection ");
            if ("all".equals(trustmgr)) {
                ftps.setTrustManager(TrustManagerUtils
                        .getAcceptAllTrustManager());
            } else if ("valid".equals(trustmgr)) {
                ftps.setTrustManager(TrustManagerUtils
                        .getValidateServerCertificateTrustManager());
            } else if ("none".equals(trustmgr)) {
                ftps.setTrustManager(null);
            }
        }

        if (printHash) {
            ftp.setCopyStreamListener(createListener());
        }
        if (keepAliveTimeout >= 0) {
            ftp.setControlKeepAliveTimeout(keepAliveTimeout);
        }
        if (controlKeepAliveReplyTimeout >= 0) {
            ftp.setControlKeepAliveReplyTimeout(controlKeepAliveReplyTimeout);
        }
        ftp.setListHiddenFiles(hidden);

        // suppress login details
        ftp.addProtocolCommandListener(new PrintCommandListener(
                new PrintWriter(System.out), true));

        try {
            int reply;
            if (port > 0) {
                ftp.connect(server, port);
            } else {
                ftp.connect(server);
            }
            System.out.println("Connected to " + server + " on "
                    + (port > 0 ? port : ftp.getDefaultPort()));

            // After connection attempt, you should check the reply code to
            // verify
            // success.
            reply = ftp.getReplyCode();

            if (!FTPReply.isPositiveCompletion(reply)) {
                ftp.disconnect();
                System.err.println("FTP server refused connection.");
                System.exit(1);
            }
        } catch (IOException e) {
            if (ftp.isConnected()) {
                try {
                    ftp.disconnect();
                } catch (IOException f) {
                    // do nothing
                }
            }
            System.err.println("Could not connect to server.");
            e.printStackTrace();
            System.exit(1);
        }

        __main: try {
            if (!ftp.login(username, password)) {
                ftp.logout();
                error = true;
                break __main;
            }

            System.out.println("Remote system is " + ftp.getSystemType());

            if (binaryTransfer) {
                ftp.setFileType(FTP.BINARY_FILE_TYPE);
               
            }

            // Use passive mode as default because most of us are
            // behind firewalls these days.
            if (localActive) {
                ftp.enterLocalActiveMode();
                //ftp.setFileTransferMode(FTP.COMPRESSED_TRANSFER_MODE);
                System.out.println("Compress Active");
               
            } else {
                ftp.enterLocalPassiveMode();
                //ftp.setFileTransferMode(FTP.COMPRESSED_TRANSFER_MODE);
                System.out.println("Compress Passive");
            }

            ftp.setUseEPSVwithIPv4(useEpsvWithIPv4);

            if (storeFile) {
                InputStream input;

                input = new FileInputStream(local);

                ftp.storeFile(remote, input);

                input.close();
            } else if (listFiles) {
                if (lenient) {
                    FTPClientConfig config = new FTPClientConfig();
                    config.setLenientFutureDates(true);
                    ftp.configure(config);
                }

                for (FTPFile f : ftp.listFiles(remote)) {
                    System.out.println(f.getRawListing());
                    System.out.println(f.toFormattedString());
                }
            } else if (mlsd) {
                for (FTPFile f : ftp.mlistDir(remote)) {
                    System.out.println(f.getRawListing());
                    System.out.println(f.toFormattedString());
                }
            } else if (mlst) {
                FTPFile f = ftp.mlistFile(remote);
                if (f != null) {
                    System.out.println(f.toFormattedString());
                }
            } else if (listNames) {
                for (String s : ftp.listNames(remote)) {
                    System.out.println(s);
                }
            } else if (feat) {
                // boolean feature check
                if (remote != null) { // See if the command is present
                    if (ftp.hasFeature(remote)) {
                        System.out.println("Has feature: " + remote);
                    } else {
                        if (FTPReply.isPositiveCompletion(ftp.getReplyCode())) {
                            System.out.println("FEAT " + remote
                                    + " was not detected");
                        } else {
                            System.out.println("Command failed: "
                                    + ftp.getReplyString());
                        }
                    }

                    // Strings feature check
                    String[] features = ftp.featureValues(remote);
                    if (features != null) {
                        for (String f : features) {
                            System.out
                            .println("FEAT " + remote + "=" + f + ".");
                        }
                    } else {
                        if (FTPReply.isPositiveCompletion(ftp.getReplyCode())) {
                            System.out.println("FEAT " + remote
                                    + " is not present");
                        } else {
                            System.out.println("Command failed: "
                                    + ftp.getReplyString());
                        }
                    }
                } else {
                    if (ftp.features()) {
                        // Command listener has already printed the output
                    } else {
                        System.out.println("Failed: " + ftp.getReplyString());
                    }
                }
            } else if (doCommand != null) {
                if (ftp.doCommand(doCommand, remote)) {
                    // Command listener has already printed the output
                    // for(String s : ftp.getReplyStrings()) {
                    // System.out.println(s);
                    // }
                } else {
                    System.out.println("Failed: " + ftp.getReplyString());
                }
            } else {
                OutputStream output;

                output = new FileOutputStream(local);

                ftp.retrieveFile(remote, output);

                output.close();
            }

            ftp.noop(); // check that control connection is working OK

            ftp.logout();
        } catch (FTPConnectionClosedException e) {
            error = true;
            System.err.println("Server closed connection.");
            e.printStackTrace();
        } catch (IOException e) {
            error = true;
            e.printStackTrace();
        } finally {
            if (ftp.isConnected()) {
                try {
                    ftp.disconnect();
                } catch (IOException f) {
                    // do nothing
                }
            }
        }

        System.exit(error ? 1 : 0);
    } // end main

    private static CopyStreamListener createListener() {
        return new CopyStreamListener() {
            private long megsTotal = 0;

            public void bytesTransferred(CopyStreamEvent event) {
                bytesTransferred(event.getTotalBytesTransferred(),
                        event.getBytesTransferred(), event.getStreamSize());
            }

            public void bytesTransferred(long totalBytesTransferred,
                    int bytesTransferred, long streamSize) {
                long megs = totalBytesTransferred / 1000000;
                for (long l = megsTotal; l < megs; l++) {
                    System.err.print("#");
                }
                megsTotal = megs;
            }
        };
    }
}

1. Run the code by creating an eclipse Project.
2. Add required libraries to the project.
3. Export it as a ant build, which will create a build.xml
4. Update the build.xml with the above argument (or as per your requirement)

Should give you an output similar to this.

[java] Connected to 127.0.0.1 on 8080
     [java] USER *******
     [java] 331 User name okay, need password for ahmed.
     [java] PASS *******
     [java] 230 User logged in, proceed.
     [java] SYST
     [java] 215 UNIX Type: Apache FtpServer
     [java] Remote system is UNIX Type: Apache FtpServer
     [java] Compress Passive
     [java] PASV
     [java] 227 Entering Passive Mode (127,0,0,1,245,156)
     [java] LIST
     [java] 150 File status okay; about to open data connection.
     [java] 226 Closing data connection.
     [java] -r--------   1 user group    9374223 Jul 25 09:38 test.sql
     [java] -r--------   1 user group    9374223 Jul 25 09:38 InforTest.sql
     [java] NOOP
     [java] 200 Command NOOP okay.
     [java] QUIT
     [java] 221 Goodbye.
BUILD SUCCESSFUL
Total time: 1 second

Categories: BigData, HOWTOs