Hadoop Consulting Experts Help In Detecting Valid, Invalid Data In Apache Pig With FILTER UDF

There are many articles that will help you to understand Apache Pig. This post is shared by Hadoop consulting experts to assist you in detecting valid, invalid data in Apache Pig with Filter UDF. You can read this post further to know more about the topic.


This post contains steps to do the verifying the data in data processing. Actually, Pig supported filter keyword to filter bad data. However, we sometimes need to handle a complex logic in the data so we need to pass the data to some programming languages to handle it. In this post, I will introduce how to use Filter UDF which written by Java to detect valid and invalid data.

Apache Pig is data flow language and is built on the top of Hadoop, it helps to process, extract, load, clean, analyze the big data with Map Reduce in high level language.

Pig allow us to create new own UDF very simple which help involve our code in multiple ways. In the real data application, we always process a huge data set daily and we will face with error data. For that case, we have two choices, first is keep the data to temp folder to investigate why we has that error. Second is we will ignore and filter the error data. In this post, I will introduce how we can detect the valid and invalid data with filter UDF in Pig.


Java: JDK 1.7

Cloudera version:  CDH5.4.7, please refer to this link: http://www.cloudera.com/downloads/cdh/5-4-7.html

Initial steps

  1. We need to prepare some input data to and define some criteria for filtering logic:

Open the file with the vi tools to create a local file:


1990-12-13 09:10:33;1990-12-13 09:10:33;1990-12-13 09:40:33;1

1990-12-13 10:10:33;1990-12-13 09:10:33;1990-12-13 09:01:33;1

1990-12-13 11:10:33;1990-12-13 09:10:33;1990-12-13 09:40:33;A1

1990-15-13 11:10:33;1990-12-13 09:10:33;1990-12-13 09:40:33;A1

  1. We need to put the local file to Hadoop Distributed File System (HDFS), use this command:

hadoop fs -mkdir -p /data/mydata/sample

hadoop fs -put sampleData /data/mydata/sample

Code walk through

This is pig script file to store split the valid data and invalid data to separate location data.

Note: Please note that this pig script will compile to Map Reduce Job to store the data to HBase in parallel.


* Register jar file to create the UDF filtering




* Define a UDF name verifyErrorRecord which call constructor from our 
UDF code to set * the format for our date time


('yyyy-MM-ddHH:mm:ss', 'yyyyMMddHHmmss');


* We load our data with four fields: eventTime, startTime, endTime and id

rawData = LOAD'/data/mydata/sample/'USINGPigStorage(';')







* We will call our UDF and input fields into, if our function returns true, 
we will set the new column is 1 and if our function returns false, 
we will set the new column is 0

detectErrorData = FOREACHrawDataGENERATErawTime,




(verifyErrorRecord(rawTime, startTime, endTime, id) ? 1 : 0) ASrecordStatus;


* We split our data to two relation in Pig with condition 
!=1 or ==0 to detect valid record or bad record


SPLITdetectErrorDataINTOinvalidRecordIFrecordStatus != 1,

validRecordIFrecordStatus == 1;


* We store the data to HDFS for bad record




* We store the data to HDFS for good record



This is UDF Java code to parse text data to JSON data:

  1. We will read the input data from Pig as a Tuple.
  2. We will use get the data from Tuple input for four fields: event time, start time, end time and id. For more manipulate data steps, we will handle easily at this step to filter data.
  3. We use isGoodRecord method to verify the data which only returns true or false. True will keep the good data and false will keep the bad data.
  4. We will check the return result from step 3 at Pig script and set the value for true is 1 and false is 0
publicclassErrorRecordDetectionextendsFilterFunc {



       publicstatic Date inputStartTime = null;

       publicstatic Date inputEndTime = null;

       publicErrorRecordDetection(String originalFormatProp, String targetFormatProp) 
       throwsJsonParseException, JsonMappingException, IOException {

              inputFormat = newSimpleDateFormat(originalFormatProp);

              outputFormat = newSimpleDateFormat(targetFormatProp);



       public Boolean exec(Tuple input) throwsIOException {

              if (input == null || input.size() == 0){



              try {

                     String eventTime = input.get(0) == null ? "" :input.get(0).toString().trim();

                     String startTime = input.get(1) == null ? "" :input.get(1).toString().trim();

                     String endTime = input.get(2) == null ? "" :input.get(2).toString().trim();

                     String id = input.get(3) == null ? "" :input.get(3).toString().trim();                  

                     returnisGoodRecord(eventTime, startTime, endTime, id);

              } catch (Exception e) {

                     thrownewIOException("Caught exception while processing the input row.", e);



       privatestaticbooleanisGoodRecord(String eventTime, String startTime, 
       String endTime, String cardId) throwsParseException{    


* We defined the input format at the top of this class. 

* We will try to parse the date time format, if we cannot parse 
then that record will have wrong format and we will consider it is bad record.


              try {


                     inputStartTime = inputFormat.parse(startTime);

                     inputEndTime = inputFormat.parse(endTime);

              } catch (ParseException e) {




* We will compare the value of start time and end time to make sure that start time 
cannot greater than end time

              if(outputFormat.format(inputStartTime).compareTo(outputFormat.format(inputEndTime)) > 0) {




* We check the id is number of not, we only consider number ids


              if (!NumberUtils.isNumber(cardId)){






Build the maven project to get the jar file of UDF:

Verify the result

  1. The log file when the Map Reduce job finished
  2. Use these command to check the output data at two locations above:

hadoop fs -cat /data/mydata/error/*

1990-12-13 10:10:33;1990-12-13 09:10:33;1990-12-13 09:01:33;1;0=> This record invalid because start time field is greater than end time field

1990-12-13 11:10:33;1990-12-13 09:10:33;1990-12-13 09:40:33;A1;0=> This record invalid because id contain ‘A’ letter.

1990-15-13 11:10:33;1990-12-13 09:10:33;1990-12-13 09:40:33;A1;0=>Same reason above

19901513111033;19901513111033;19901513111035;A2;0=> This record invalid because date time format is not correct

hadoop fs -cat /data/mydata/good/*

1990-12-13 09:10:33;1990-12-13 09:10:33;1990-12-13 09:40:33;1;1

  • This record is valid because:
    • Date time format is correct. Input format: yyyy-MM-ddHH:mm:ss, parse format: yyyyMMddHHmmss
    • Start time is before end time.
    • Id is number, not character.
  1. The structure of project should be like this, we need two bold file in the picture:

Hadoop consulting experts hope you have completely understood how to detect valid, invalid data in Apache Pig. If you still have any doubt or query, ask professionals. You can stay tune for more related updates as well.

I hope this blog will help you guys can do a complex filtering logic in big data processing with Pig.

You might like

About the Author: Vijay Aegis

We use cookies in order to give you the best possible experience on our website. By continuing to use this site, you agree to our use of cookies.