Code to perform validations on time series data

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;







up vote
1
down vote

favorite












Scenario:



I have a time series data published to a Queue (Kafka). Each message (kafka message) when consumed from the topic needs to be validated by rules which are created in a datastore. These rules are called as filters in our case.



A filter object looks like below.



import java.io.Serializable;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder( "id", "name", "field", "condition", "action" )
public class Filter implements Serializable

private static final long serialVersionUID = 7488270474843121039L;
@JsonProperty("id")
private String id;
@JsonProperty("name")
private String name;
@JsonProperty("field")
private String field;
@JsonProperty("condition")
private CONDITION condition;
@JsonProperty("action")
private ACTION action;
@JsonProperty("type")
private TYPE type;

@JsonProperty("id")
public String getId()
return id;


@JsonProperty("id")
public void setId(String id)
this.id = id;


@JsonProperty("name")
public String getName()
return name;


@JsonProperty("name")
public void setName(String name)
this.name = name;


@JsonProperty("field")
public String getField()
return field;


@JsonProperty("field")
public void setField(String field)
this.field = field;


@JsonProperty("condition")
public CONDITION getCondition()
return condition;


@JsonProperty("condition")
public void setCondition(CONDITION condition)
this.condition = condition;


@JsonProperty("action")
public ACTION getAction()
return action;


@JsonProperty("action")
public void setAction(ACTION action)
this.action = action;


@JsonProperty("type")
public TYPE getType()
return type;


@JsonProperty("type")
public void setType(TYPE type)
this.type = type;





The service which consumes and performs validation is as below.



@Service
public class ConsumerService

final Logger logger = LoggerFactory.getLogger(ConsumerService.class);

// TODO - consider renaming the interface?
@Autowired
private IFilterService filterService;

@Autowired
private ProducerService producerService;

public void consume()

String topic = "daas.filter.json.incoming";
String consGroup = "syslogJsonconsumer";

KafkaConsumer<String, JsonNode> consumer = new GenericConsumer<String, JsonNode>().initialize(topic, consGroup,
STREAMSERDE.STRINGDESER, STREAMSERDE.JSONDESER);

logger.debug("filter Kafka Consumer Initialized......");

try
while (true)
ConsumerRecords<String, JsonNode> records = consumer.poll(100);

for (ConsumerRecord<String, JsonNode> record : records)

logger.debug("record.offset() = " + record.offset() + " : record.key() = " + record.key());

// TODO - receive avro message instead of JSONNode.
JsonNode jsonNode = record.value();

logger.info("##Filter Consumer##>>incoming Message = " + jsonNode);

ObjectMapper objectMapper = new ObjectMapper();
NormalizedSyslogMessage syslog = (NormalizedSyslogMessage) objectMapper.convertValue(jsonNode,
NormalizedSyslogMessage.class);

// TODO - consider moving below code out of Consumer.

// apply filters on the incoming message. Returns isDropped.
boolean isDropped = applyFilter(syslog);
logger.info("message [" + syslog.getKey() + " - isDropped##>> = " + isDropped);

if (!isDropped)
// TODO forward the rawSyslogMessage to Correlation
// Service
producerService.produce(syslog);





catch (Exception e)
logger.error("Error occured while processing message", e);
finally
logger.debug("debupe kafka consume is closing");
consumer.close();




// load filters for a given party and apply on the rawSyslogMessage.
private boolean applyFilter(NormalizedSyslogMessage message) throws EntityNotFoundException

boolean isDropped = false;

logger.info("---applyFilter()::rawSyslog :" + message);

String partyID = message.getPartyID();

// load all filters for the given party
List<Filter> filters = filterService.getAll(partyID);

if (filters != null)
for (Filter filter : filters)
if (filter.getType() == TYPE.NUMBER_COMPARATOR)
if (filter.getCondition() == CONDITION.GREATER && filter.getAction() == ACTION.ALLOW)
if (message.getSeverity() > Long.parseLong(filter.getField()))
logger.info("message forwarded for correlation");
else
logger.info("message is dropped");

else if (filter.getCondition() == CONDITION.LESSER && filter.getAction() == ACTION.ALLOW)
if (message.getSeverity() < Long.parseLong(filter.getField()))
logger.info("message forwarded for correlation");
else
logger.info("message is dropped");

else if (filter.getCondition() == CONDITION.GREATER && filter.getAction() == ACTION.DISCARD)
if (message.getSeverity() > Long.parseLong(filter.getField()))
logger.info("message is dropped");
else
logger.info("message forwarded for correlation");

else if (filter.getCondition() == CONDITION.LESSER && filter.getAction() == ACTION.DISCARD)
if (message.getSeverity() < Long.parseLong(filter.getField()))
logger.info("message is dropped");
else
logger.info("message forwarded for correlation");

else if (filter.getCondition() == CONDITION.EQUALS && filter.getAction() == ACTION.ALLOW)
if (message.getSeverity() == Long.parseLong(filter.getField()))
logger.info("message forwarded for correlation");
else
logger.info("message is dropped");

else if (filter.getCondition() == CONDITION.EQUALS && filter.getAction() == ACTION.DISCARD)
if (message.getSeverity() == Long.parseLong(filter.getField()))
logger.info("message is dropped");
else
logger.info("message forwarded for correlation");

else if (filter.getCondition() == CONDITION.BETWEEN && filter.getAction() == ACTION.ALLOW)
String operands = filter.getField().split(",");
if (Long.parseLong(operands[0]) <= message.getSeverity()
&& Long.parseLong(operands[1]) >= message.getSeverity())
logger.info("message forwarded for correlation");
else
logger.info("message is dropped");

else if (filter.getCondition() == CONDITION.BETWEEN && filter.getAction() == ACTION.DISCARD)
String operands = filter.getField().split(",");
if (Long.parseLong(operands[0]) <= message.getSeverity()
&& Long.parseLong(operands[1]) >= message.getSeverity())
logger.info("message is dropped");
else
logger.info("message forwarded for correlation");

else
throw new IllegalArgumentException("The CONDITION or ACTION is not supported");

else if (filter.getType() == TYPE.STRING_COMPARATOR)
if (filter.getCondition() == CONDITION.MATCHES && filter.getAction() == ACTION.ALLOW)
if (message.getMessage().contains(filter.getField()))
logger.info("message forwarded for correlation");
else
logger.info("message is dropped");

else if (filter.getCondition() == CONDITION.MATCHES && filter.getAction() == ACTION.DISCARD)
if (message.getMessage().contains(filter.getField()))
logger.info("message is dropped");
else
logger.info("message forwarded for correlation");


else
throw new IllegalArgumentException("The filter type " + filter.getType() + " is not supported. ");



return isDropped;




The ConsumerService consumes the messages and using the unique id in the message it gets all the filters.



After getting all the filters the message is validated on the filter list and checks if the message needs to be forwarded for further processing.



I would like my code to be reviewed on mainly the design and what are the best practices i could follow to avoid clumsiness.



Edit: Adding the new changes to the code



After giving a lot of thought i have create interface for generating and evaluating expressions. Created a Builder class to create Operands class which caters to 2 to 3 operands for an expression generation



@Service
public class ConsumerService

final Logger logger = LoggerFactory.getLogger(ConsumerService.class);

// TODO - consider renaming the interface?
@Autowired
private IFilterService filterService;

@Autowired
private ProducerService producerService;

public void consume()

String topic = "daas.filter.json.incoming";
String consGroup = "syslogJsonconsumer";

KafkaConsumer<String, JsonNode> consumer = new GenericConsumer<String, JsonNode>().initialize(topic, consGroup,
STREAMSERDE.STRINGDESER, STREAMSERDE.JSONDESER);

logger.debug("filter Kafka Consumer Initialized......");

try
while (true)
ConsumerRecords<String, JsonNode> records = consumer.poll(100);

for (ConsumerRecord<String, JsonNode> record : records)

logger.debug("record.offset() = " + record.offset() + " : record.key() = " + record.key());

// TODO - receive avro message instead of JSONNode.
JsonNode jsonNode = record.value();

logger.info("##Filter Consumer##>>incoming Message = " + jsonNode);

ObjectMapper objectMapper = new ObjectMapper();
NormalizedSyslogMessage syslog = (NormalizedSyslogMessage) objectMapper.convertValue(jsonNode,
NormalizedSyslogMessage.class);

// TODO - consider moving below code out of Consumer.

// apply filters on the incoming message. Returns isDropped.
boolean isDropped = applyFilter(syslog);
logger.info("message [" + syslog.getKey() + " - isDropped##>> = " + isDropped);

if (!isDropped)
// TODO forward the rawSyslogMessage to Correlation
// Service
producerService.produce(syslog);





catch (Exception e)
logger.error("Error occured while processing message", e);
finally
logger.debug("debupe kafka consume is closing");
consumer.close();




// load filters for a given party and apply on the rawSyslogMessage.
private boolean applyFilter(NormalizedSyslogMessage message) throws Exception

boolean isDropped = false;

logger.info("---applyFilter()::rawSyslog :" + message);

String partyID = message.getPartyID();

// load all filters for the given party
List<Filter> filters = filterService.getAll(partyID);

if (filters != null)
for (Filter filter : filters)
Operands operands;
if (filter.getType() == TYPE.NUMBER_COMPARATOR)
operands = new Operands.OperandsBuilder(Long.toString(message.getSeverity()), filter.getField())
.build();
if (filter.getCondition().equals(CONDITION.BETWEEN))
String fields = filter.getField().split(",");
operands = new Operands.OperandsBuilder(fields[0], Long.toString(message.getSeverity()))
.middleOperand(fields[1]).build();

Expression expression = ExpressionPicker.pickExpressionBy(filter.getCondition(), filter.getAction(),
operands);
isDropped = expression.evaluate();
else if (filter.getType() == TYPE.STRING_COMPARATOR)
operands = new Operands.OperandsBuilder(message.getMessage(), filter.getField())
.build();
Expression expression = ExpressionPicker.pickExpressionBy(filter.getCondition(), filter.getAction(),
operands);
isDropped = expression.evaluate();
else
throw new Exception("Filter invalid, please check.");



return isDropped;



/*
Interface for implementing different kinds of expressions
*/

public interface Expression
boolean evaluate();


/*
Factory to generate different expressions.
*/

public class ExpressionPicker

public static Expression pickExpressionBy(CONDITION condition, ACTION action, Operands operands) condition.name().equals("LESSER") && action.name().equals("ALLOW"))
return new IsLesserExpression(Integer.parseInt(operands.get_leftOperand()),
Integer.parseInt(operands.get_rightOperand()));
else if (condition.name().equals("EQUALS") && action.name().equals("ALLOW"))
return new IsEqualsExpression(Integer.parseInt(operands.get_leftOperand()),
Integer.parseInt(operands.get_rightOperand()), ACTION.ALLOW);
else if (condition.name().equals("EQUALS") && action.name().equals("DISCARD"))
return new IsEqualsExpression(Integer.parseInt(operands.get_leftOperand()),
Integer.parseInt(operands.get_rightOperand()), ACTION.DISCARD);
else if (condition.name().equals("BETWEEN") && action.name().equals("ALLOW"))
return new IsInBetweenExpression(Integer.parseInt(operands.get_leftOperand()),
Integer.parseInt(operands.get_rightOperand()), Integer.parseInt(operands.get_middleOperand()),
ACTION.ALLOW);
else if (condition.name().equals("BETWEEN") && action.name().equals("DISCARD"))
return new IsInBetweenExpression(Integer.parseInt(operands.get_leftOperand()),
Integer.parseInt(operands.get_rightOperand()), Integer.parseInt(operands.get_middleOperand()),
ACTION.DISCARD);
else if (condition.name().equals("MATCHES") && action.name().equals("ALLOW"))
return new IsMatchingWithExpression(operands.get_leftOperand(), operands.get_rightOperand(), ACTION.ALLOW);
else if (condition.name().equals("MATCHES") && action.name().equals("DISCARD"))
return new IsMatchingWithExpression(operands.get_leftOperand(), operands.get_rightOperand(),
ACTION.DISCARD);
else
throw new IllegalArgumentException("Invalid input");






public class IsEqualsExpression implements Expression

private int _leftOperand;
private int _rightOperand;
private ACTION _action;

public IsEqualsExpression(int leftOperand, int rightOperand, ACTION action)
this._leftOperand = leftOperand;
this._rightOperand = rightOperand;
this._action = action;


@Override
public boolean evaluate()
if (_action.name() == "ALLOW" && _leftOperand == _rightOperand)
return true;

return false;





public class IsGreaterExpression implements Expression

private int _leftOperand;
private int _rightOperand;

public IsGreaterExpression(int leftOperand, int rightOperand)
this._leftOperand = leftOperand;
this._rightOperand = rightOperand;


@Override
public boolean evaluate()
if (_leftOperand > _rightOperand)
return true;

return false;






public class IsInBetweenExpression implements Expression

private int _leftOperand;
private int _rightOperand;
private int _middleOperand;
private ACTION _action;

public IsInBetweenExpression(int leftOperand, int rightOperand, int middleOperand, ACTION action)
this._leftOperand = leftOperand;
this._rightOperand = rightOperand;
this._middleOperand = middleOperand;
this._action = action;


@Override
public boolean evaluate()
if (_action.name() == "ALLOW")
return getRange();

return false;


private boolean getRange()
if (_leftOperand <= _middleOperand && _rightOperand >= _middleOperand)
return true;

return false;





public class IsLesserExpression implements Expression

private int _leftOperand;
private int _rightOperand;

public IsLesserExpression(int leftOperand, int rightOperand)
this._leftOperand = leftOperand;
this._rightOperand = rightOperand;

@Override
public boolean evaluate()
if (_leftOperand > _rightOperand)
return true;

return false;




public class IsMatchingWithExpression implements Expression

private String _rightOperand;
private String _leftOperand;
private ACTION _action;

public IsMatchingWithExpression(String inputString, String patternString, ACTION action)
this._rightOperand = inputString;
this._leftOperand = patternString;
this._action = action;


@Override
public boolean evaluate()
if (_action.name() == "ALLOW")
return getMatchResult();

return false;


private boolean getMatchResult()
if (_rightOperand.contains(_leftOperand))
return true;

return false;




/* Builder class to build operands to feed to the expressions for end result.

public class Operands
private final String _rightOperand;
private final String _leftOperand;
private final String _middleOperand;

private Operands(OperandsBuilder builder)
this._rightOperand = builder._rightOperand;
this._leftOperand = builder._leftOperand;
this._middleOperand = builder._middleOperand;


public String get_rightOperand()
return _rightOperand;


public String get_leftOperand()
return _leftOperand;


public String get_middleOperand()
return _middleOperand;


public static class OperandsBuilder
private final String _rightOperand;
private final String _leftOperand;
private String _middleOperand;

public OperandsBuilder(String leftOperand, String rightOperand)
this._rightOperand = rightOperand;
this._leftOperand = leftOperand;


public OperandsBuilder middleOperand(String value)
this._middleOperand = value;
return this;


public Operands build()
return new Operands(this);






Please provide the suggestions to make the code better.



At this moment i am aware that the validation logic needs to be cleaned up but i am not aware of how it can be done. If anybody can help me cleaning up the code i would be grateful.







share|improve this question



























    up vote
    1
    down vote

    favorite












    Scenario:



    I have a time series data published to a Queue (Kafka). Each message (kafka message) when consumed from the topic needs to be validated by rules which are created in a datastore. These rules are called as filters in our case.



    A filter object looks like below.



    import java.io.Serializable;
    import com.fasterxml.jackson.annotation.JsonInclude;
    import com.fasterxml.jackson.annotation.JsonProperty;
    import com.fasterxml.jackson.annotation.JsonPropertyOrder;

    @JsonInclude(JsonInclude.Include.NON_NULL)
    @JsonPropertyOrder( "id", "name", "field", "condition", "action" )
    public class Filter implements Serializable

    private static final long serialVersionUID = 7488270474843121039L;
    @JsonProperty("id")
    private String id;
    @JsonProperty("name")
    private String name;
    @JsonProperty("field")
    private String field;
    @JsonProperty("condition")
    private CONDITION condition;
    @JsonProperty("action")
    private ACTION action;
    @JsonProperty("type")
    private TYPE type;

    @JsonProperty("id")
    public String getId()
    return id;


    @JsonProperty("id")
    public void setId(String id)
    this.id = id;


    @JsonProperty("name")
    public String getName()
    return name;


    @JsonProperty("name")
    public void setName(String name)
    this.name = name;


    @JsonProperty("field")
    public String getField()
    return field;


    @JsonProperty("field")
    public void setField(String field)
    this.field = field;


    @JsonProperty("condition")
    public CONDITION getCondition()
    return condition;


    @JsonProperty("condition")
    public void setCondition(CONDITION condition)
    this.condition = condition;


    @JsonProperty("action")
    public ACTION getAction()
    return action;


    @JsonProperty("action")
    public void setAction(ACTION action)
    this.action = action;


    @JsonProperty("type")
    public TYPE getType()
    return type;


    @JsonProperty("type")
    public void setType(TYPE type)
    this.type = type;





    The service which consumes and performs validation is as below.



    @Service
    public class ConsumerService

    final Logger logger = LoggerFactory.getLogger(ConsumerService.class);

    // TODO - consider renaming the interface?
    @Autowired
    private IFilterService filterService;

    @Autowired
    private ProducerService producerService;

    public void consume()

    String topic = "daas.filter.json.incoming";
    String consGroup = "syslogJsonconsumer";

    KafkaConsumer<String, JsonNode> consumer = new GenericConsumer<String, JsonNode>().initialize(topic, consGroup,
    STREAMSERDE.STRINGDESER, STREAMSERDE.JSONDESER);

    logger.debug("filter Kafka Consumer Initialized......");

    try
    while (true)
    ConsumerRecords<String, JsonNode> records = consumer.poll(100);

    for (ConsumerRecord<String, JsonNode> record : records)

    logger.debug("record.offset() = " + record.offset() + " : record.key() = " + record.key());

    // TODO - receive avro message instead of JSONNode.
    JsonNode jsonNode = record.value();

    logger.info("##Filter Consumer##>>incoming Message = " + jsonNode);

    ObjectMapper objectMapper = new ObjectMapper();
    NormalizedSyslogMessage syslog = (NormalizedSyslogMessage) objectMapper.convertValue(jsonNode,
    NormalizedSyslogMessage.class);

    // TODO - consider moving below code out of Consumer.

    // apply filters on the incoming message. Returns isDropped.
    boolean isDropped = applyFilter(syslog);
    logger.info("message [" + syslog.getKey() + " - isDropped##>> = " + isDropped);

    if (!isDropped)
    // TODO forward the rawSyslogMessage to Correlation
    // Service
    producerService.produce(syslog);





    catch (Exception e)
    logger.error("Error occured while processing message", e);
    finally
    logger.debug("debupe kafka consume is closing");
    consumer.close();




    // load filters for a given party and apply on the rawSyslogMessage.
    private boolean applyFilter(NormalizedSyslogMessage message) throws EntityNotFoundException

    boolean isDropped = false;

    logger.info("---applyFilter()::rawSyslog :" + message);

    String partyID = message.getPartyID();

    // load all filters for the given party
    List<Filter> filters = filterService.getAll(partyID);

    if (filters != null)
    for (Filter filter : filters)
    if (filter.getType() == TYPE.NUMBER_COMPARATOR)
    if (filter.getCondition() == CONDITION.GREATER && filter.getAction() == ACTION.ALLOW)
    if (message.getSeverity() > Long.parseLong(filter.getField()))
    logger.info("message forwarded for correlation");
    else
    logger.info("message is dropped");

    else if (filter.getCondition() == CONDITION.LESSER && filter.getAction() == ACTION.ALLOW)
    if (message.getSeverity() < Long.parseLong(filter.getField()))
    logger.info("message forwarded for correlation");
    else
    logger.info("message is dropped");

    else if (filter.getCondition() == CONDITION.GREATER && filter.getAction() == ACTION.DISCARD)
    if (message.getSeverity() > Long.parseLong(filter.getField()))
    logger.info("message is dropped");
    else
    logger.info("message forwarded for correlation");

    else if (filter.getCondition() == CONDITION.LESSER && filter.getAction() == ACTION.DISCARD)
    if (message.getSeverity() < Long.parseLong(filter.getField()))
    logger.info("message is dropped");
    else
    logger.info("message forwarded for correlation");

    else if (filter.getCondition() == CONDITION.EQUALS && filter.getAction() == ACTION.ALLOW)
    if (message.getSeverity() == Long.parseLong(filter.getField()))
    logger.info("message forwarded for correlation");
    else
    logger.info("message is dropped");

    else if (filter.getCondition() == CONDITION.EQUALS && filter.getAction() == ACTION.DISCARD)
    if (message.getSeverity() == Long.parseLong(filter.getField()))
    logger.info("message is dropped");
    else
    logger.info("message forwarded for correlation");

    else if (filter.getCondition() == CONDITION.BETWEEN && filter.getAction() == ACTION.ALLOW)
    String operands = filter.getField().split(",");
    if (Long.parseLong(operands[0]) <= message.getSeverity()
    && Long.parseLong(operands[1]) >= message.getSeverity())
    logger.info("message forwarded for correlation");
    else
    logger.info("message is dropped");

    else if (filter.getCondition() == CONDITION.BETWEEN && filter.getAction() == ACTION.DISCARD)
    String operands = filter.getField().split(",");
    if (Long.parseLong(operands[0]) <= message.getSeverity()
    && Long.parseLong(operands[1]) >= message.getSeverity())
    logger.info("message is dropped");
    else
    logger.info("message forwarded for correlation");

    else
    throw new IllegalArgumentException("The CONDITION or ACTION is not supported");

    else if (filter.getType() == TYPE.STRING_COMPARATOR)
    if (filter.getCondition() == CONDITION.MATCHES && filter.getAction() == ACTION.ALLOW)
    if (message.getMessage().contains(filter.getField()))
    logger.info("message forwarded for correlation");
    else
    logger.info("message is dropped");

    else if (filter.getCondition() == CONDITION.MATCHES && filter.getAction() == ACTION.DISCARD)
    if (message.getMessage().contains(filter.getField()))
    logger.info("message is dropped");
    else
    logger.info("message forwarded for correlation");


    else
    throw new IllegalArgumentException("The filter type " + filter.getType() + " is not supported. ");



    return isDropped;




    The ConsumerService consumes the messages and using the unique id in the message it gets all the filters.



    After getting all the filters the message is validated on the filter list and checks if the message needs to be forwarded for further processing.



    I would like my code to be reviewed on mainly the design and what are the best practices i could follow to avoid clumsiness.



    Edit: Adding the new changes to the code



    After giving a lot of thought i have create interface for generating and evaluating expressions. Created a Builder class to create Operands class which caters to 2 to 3 operands for an expression generation



    @Service
    public class ConsumerService

    final Logger logger = LoggerFactory.getLogger(ConsumerService.class);

    // TODO - consider renaming the interface?
    @Autowired
    private IFilterService filterService;

    @Autowired
    private ProducerService producerService;

    public void consume()

    String topic = "daas.filter.json.incoming";
    String consGroup = "syslogJsonconsumer";

    KafkaConsumer<String, JsonNode> consumer = new GenericConsumer<String, JsonNode>().initialize(topic, consGroup,
    STREAMSERDE.STRINGDESER, STREAMSERDE.JSONDESER);

    logger.debug("filter Kafka Consumer Initialized......");

    try
    while (true)
    ConsumerRecords<String, JsonNode> records = consumer.poll(100);

    for (ConsumerRecord<String, JsonNode> record : records)

    logger.debug("record.offset() = " + record.offset() + " : record.key() = " + record.key());

    // TODO - receive avro message instead of JSONNode.
    JsonNode jsonNode = record.value();

    logger.info("##Filter Consumer##>>incoming Message = " + jsonNode);

    ObjectMapper objectMapper = new ObjectMapper();
    NormalizedSyslogMessage syslog = (NormalizedSyslogMessage) objectMapper.convertValue(jsonNode,
    NormalizedSyslogMessage.class);

    // TODO - consider moving below code out of Consumer.

    // apply filters on the incoming message. Returns isDropped.
    boolean isDropped = applyFilter(syslog);
    logger.info("message [" + syslog.getKey() + " - isDropped##>> = " + isDropped);

    if (!isDropped)
    // TODO forward the rawSyslogMessage to Correlation
    // Service
    producerService.produce(syslog);





    catch (Exception e)
    logger.error("Error occured while processing message", e);
    finally
    logger.debug("debupe kafka consume is closing");
    consumer.close();




    // load filters for a given party and apply on the rawSyslogMessage.
    private boolean applyFilter(NormalizedSyslogMessage message) throws Exception

    boolean isDropped = false;

    logger.info("---applyFilter()::rawSyslog :" + message);

    String partyID = message.getPartyID();

    // load all filters for the given party
    List<Filter> filters = filterService.getAll(partyID);

    if (filters != null)
    for (Filter filter : filters)
    Operands operands;
    if (filter.getType() == TYPE.NUMBER_COMPARATOR)
    operands = new Operands.OperandsBuilder(Long.toString(message.getSeverity()), filter.getField())
    .build();
    if (filter.getCondition().equals(CONDITION.BETWEEN))
    String fields = filter.getField().split(",");
    operands = new Operands.OperandsBuilder(fields[0], Long.toString(message.getSeverity()))
    .middleOperand(fields[1]).build();

    Expression expression = ExpressionPicker.pickExpressionBy(filter.getCondition(), filter.getAction(),
    operands);
    isDropped = expression.evaluate();
    else if (filter.getType() == TYPE.STRING_COMPARATOR)
    operands = new Operands.OperandsBuilder(message.getMessage(), filter.getField())
    .build();
    Expression expression = ExpressionPicker.pickExpressionBy(filter.getCondition(), filter.getAction(),
    operands);
    isDropped = expression.evaluate();
    else
    throw new Exception("Filter invalid, please check.");



    return isDropped;



    /*
    Interface for implementing different kinds of expressions
    */

    public interface Expression
    boolean evaluate();


    /*
    Factory to generate different expressions.
    */

    public class ExpressionPicker

    public static Expression pickExpressionBy(CONDITION condition, ACTION action, Operands operands) condition.name().equals("LESSER") && action.name().equals("ALLOW"))
    return new IsLesserExpression(Integer.parseInt(operands.get_leftOperand()),
    Integer.parseInt(operands.get_rightOperand()));
    else if (condition.name().equals("EQUALS") && action.name().equals("ALLOW"))
    return new IsEqualsExpression(Integer.parseInt(operands.get_leftOperand()),
    Integer.parseInt(operands.get_rightOperand()), ACTION.ALLOW);
    else if (condition.name().equals("EQUALS") && action.name().equals("DISCARD"))
    return new IsEqualsExpression(Integer.parseInt(operands.get_leftOperand()),
    Integer.parseInt(operands.get_rightOperand()), ACTION.DISCARD);
    else if (condition.name().equals("BETWEEN") && action.name().equals("ALLOW"))
    return new IsInBetweenExpression(Integer.parseInt(operands.get_leftOperand()),
    Integer.parseInt(operands.get_rightOperand()), Integer.parseInt(operands.get_middleOperand()),
    ACTION.ALLOW);
    else if (condition.name().equals("BETWEEN") && action.name().equals("DISCARD"))
    return new IsInBetweenExpression(Integer.parseInt(operands.get_leftOperand()),
    Integer.parseInt(operands.get_rightOperand()), Integer.parseInt(operands.get_middleOperand()),
    ACTION.DISCARD);
    else if (condition.name().equals("MATCHES") && action.name().equals("ALLOW"))
    return new IsMatchingWithExpression(operands.get_leftOperand(), operands.get_rightOperand(), ACTION.ALLOW);
    else if (condition.name().equals("MATCHES") && action.name().equals("DISCARD"))
    return new IsMatchingWithExpression(operands.get_leftOperand(), operands.get_rightOperand(),
    ACTION.DISCARD);
    else
    throw new IllegalArgumentException("Invalid input");






    public class IsEqualsExpression implements Expression

    private int _leftOperand;
    private int _rightOperand;
    private ACTION _action;

    public IsEqualsExpression(int leftOperand, int rightOperand, ACTION action)
    this._leftOperand = leftOperand;
    this._rightOperand = rightOperand;
    this._action = action;


    @Override
    public boolean evaluate()
    if (_action.name() == "ALLOW" && _leftOperand == _rightOperand)
    return true;

    return false;





    public class IsGreaterExpression implements Expression

    private int _leftOperand;
    private int _rightOperand;

    public IsGreaterExpression(int leftOperand, int rightOperand)
    this._leftOperand = leftOperand;
    this._rightOperand = rightOperand;


    @Override
    public boolean evaluate()
    if (_leftOperand > _rightOperand)
    return true;

    return false;






    public class IsInBetweenExpression implements Expression

    private int _leftOperand;
    private int _rightOperand;
    private int _middleOperand;
    private ACTION _action;

    public IsInBetweenExpression(int leftOperand, int rightOperand, int middleOperand, ACTION action)
    this._leftOperand = leftOperand;
    this._rightOperand = rightOperand;
    this._middleOperand = middleOperand;
    this._action = action;


    @Override
    public boolean evaluate()
    if (_action.name() == "ALLOW")
    return getRange();

    return false;


    private boolean getRange()
    if (_leftOperand <= _middleOperand && _rightOperand >= _middleOperand)
    return true;

    return false;





    public class IsLesserExpression implements Expression

    private int _leftOperand;
    private int _rightOperand;

    public IsLesserExpression(int leftOperand, int rightOperand)
    this._leftOperand = leftOperand;
    this._rightOperand = rightOperand;

    @Override
    public boolean evaluate()
    if (_leftOperand > _rightOperand)
    return true;

    return false;




    public class IsMatchingWithExpression implements Expression

    private String _rightOperand;
    private String _leftOperand;
    private ACTION _action;

    public IsMatchingWithExpression(String inputString, String patternString, ACTION action)
    this._rightOperand = inputString;
    this._leftOperand = patternString;
    this._action = action;


    @Override
    public boolean evaluate()
    if (_action.name() == "ALLOW")
    return getMatchResult();

    return false;


    private boolean getMatchResult()
    if (_rightOperand.contains(_leftOperand))
    return true;

    return false;




    /* Builder class to build operands to feed to the expressions for end result.

    public class Operands
    private final String _rightOperand;
    private final String _leftOperand;
    private final String _middleOperand;

    private Operands(OperandsBuilder builder)
    this._rightOperand = builder._rightOperand;
    this._leftOperand = builder._leftOperand;
    this._middleOperand = builder._middleOperand;


    public String get_rightOperand()
    return _rightOperand;


    public String get_leftOperand()
    return _leftOperand;


    public String get_middleOperand()
    return _middleOperand;


    public static class OperandsBuilder
    private final String _rightOperand;
    private final String _leftOperand;
    private String _middleOperand;

    public OperandsBuilder(String leftOperand, String rightOperand)
    this._rightOperand = rightOperand;
    this._leftOperand = leftOperand;


    public OperandsBuilder middleOperand(String value)
    this._middleOperand = value;
    return this;


    public Operands build()
    return new Operands(this);






    Please provide the suggestions to make the code better.



    At this moment i am aware that the validation logic needs to be cleaned up but i am not aware of how it can be done. If anybody can help me cleaning up the code i would be grateful.







    share|improve this question























      up vote
      1
      down vote

      favorite









      up vote
      1
      down vote

      favorite











      Scenario:



      I have a time series data published to a Queue (Kafka). Each message (kafka message) when consumed from the topic needs to be validated by rules which are created in a datastore. These rules are called as filters in our case.



      A filter object looks like below.



      import java.io.Serializable;
      import com.fasterxml.jackson.annotation.JsonInclude;
      import com.fasterxml.jackson.annotation.JsonProperty;
      import com.fasterxml.jackson.annotation.JsonPropertyOrder;

      @JsonInclude(JsonInclude.Include.NON_NULL)
      @JsonPropertyOrder( "id", "name", "field", "condition", "action" )
      public class Filter implements Serializable

      private static final long serialVersionUID = 7488270474843121039L;
      @JsonProperty("id")
      private String id;
      @JsonProperty("name")
      private String name;
      @JsonProperty("field")
      private String field;
      @JsonProperty("condition")
      private CONDITION condition;
      @JsonProperty("action")
      private ACTION action;
      @JsonProperty("type")
      private TYPE type;

      @JsonProperty("id")
      public String getId()
      return id;


      @JsonProperty("id")
      public void setId(String id)
      this.id = id;


      @JsonProperty("name")
      public String getName()
      return name;


      @JsonProperty("name")
      public void setName(String name)
      this.name = name;


      @JsonProperty("field")
      public String getField()
      return field;


      @JsonProperty("field")
      public void setField(String field)
      this.field = field;


      @JsonProperty("condition")
      public CONDITION getCondition()
      return condition;


      @JsonProperty("condition")
      public void setCondition(CONDITION condition)
      this.condition = condition;


      @JsonProperty("action")
      public ACTION getAction()
      return action;


      @JsonProperty("action")
      public void setAction(ACTION action)
      this.action = action;


      @JsonProperty("type")
      public TYPE getType()
      return type;


      @JsonProperty("type")
      public void setType(TYPE type)
      this.type = type;





      The service which consumes and performs validation is as below.



      @Service
      public class ConsumerService

      final Logger logger = LoggerFactory.getLogger(ConsumerService.class);

      // TODO - consider renaming the interface?
      @Autowired
      private IFilterService filterService;

      @Autowired
      private ProducerService producerService;

      public void consume()

      String topic = "daas.filter.json.incoming";
      String consGroup = "syslogJsonconsumer";

      KafkaConsumer<String, JsonNode> consumer = new GenericConsumer<String, JsonNode>().initialize(topic, consGroup,
      STREAMSERDE.STRINGDESER, STREAMSERDE.JSONDESER);

      logger.debug("filter Kafka Consumer Initialized......");

      try
      while (true)
      ConsumerRecords<String, JsonNode> records = consumer.poll(100);

      for (ConsumerRecord<String, JsonNode> record : records)

      logger.debug("record.offset() = " + record.offset() + " : record.key() = " + record.key());

      // TODO - receive avro message instead of JSONNode.
      JsonNode jsonNode = record.value();

      logger.info("##Filter Consumer##>>incoming Message = " + jsonNode);

      ObjectMapper objectMapper = new ObjectMapper();
      NormalizedSyslogMessage syslog = (NormalizedSyslogMessage) objectMapper.convertValue(jsonNode,
      NormalizedSyslogMessage.class);

      // TODO - consider moving below code out of Consumer.

      // apply filters on the incoming message. Returns isDropped.
      boolean isDropped = applyFilter(syslog);
      logger.info("message [" + syslog.getKey() + " - isDropped##>> = " + isDropped);

      if (!isDropped)
      // TODO forward the rawSyslogMessage to Correlation
      // Service
      producerService.produce(syslog);





      catch (Exception e)
      logger.error("Error occured while processing message", e);
      finally
      logger.debug("debupe kafka consume is closing");
      consumer.close();




      // load filters for a given party and apply on the rawSyslogMessage.
      private boolean applyFilter(NormalizedSyslogMessage message) throws EntityNotFoundException

      boolean isDropped = false;

      logger.info("---applyFilter()::rawSyslog :" + message);

      String partyID = message.getPartyID();

      // load all filters for the given party
      List<Filter> filters = filterService.getAll(partyID);

      if (filters != null)
      for (Filter filter : filters)
      if (filter.getType() == TYPE.NUMBER_COMPARATOR)
      if (filter.getCondition() == CONDITION.GREATER && filter.getAction() == ACTION.ALLOW)
      if (message.getSeverity() > Long.parseLong(filter.getField()))
      logger.info("message forwarded for correlation");
      else
      logger.info("message is dropped");

      else if (filter.getCondition() == CONDITION.LESSER && filter.getAction() == ACTION.ALLOW)
      if (message.getSeverity() < Long.parseLong(filter.getField()))
      logger.info("message forwarded for correlation");
      else
      logger.info("message is dropped");

      else if (filter.getCondition() == CONDITION.GREATER && filter.getAction() == ACTION.DISCARD)
      if (message.getSeverity() > Long.parseLong(filter.getField()))
      logger.info("message is dropped");
      else
      logger.info("message forwarded for correlation");

      else if (filter.getCondition() == CONDITION.LESSER && filter.getAction() == ACTION.DISCARD)
      if (message.getSeverity() < Long.parseLong(filter.getField()))
      logger.info("message is dropped");
      else
      logger.info("message forwarded for correlation");

      else if (filter.getCondition() == CONDITION.EQUALS && filter.getAction() == ACTION.ALLOW)
      if (message.getSeverity() == Long.parseLong(filter.getField()))
      logger.info("message forwarded for correlation");
      else
      logger.info("message is dropped");

      else if (filter.getCondition() == CONDITION.EQUALS && filter.getAction() == ACTION.DISCARD)
      if (message.getSeverity() == Long.parseLong(filter.getField()))
      logger.info("message is dropped");
      else
      logger.info("message forwarded for correlation");

      else if (filter.getCondition() == CONDITION.BETWEEN && filter.getAction() == ACTION.ALLOW)
      String operands = filter.getField().split(",");
      if (Long.parseLong(operands[0]) <= message.getSeverity()
      && Long.parseLong(operands[1]) >= message.getSeverity())
      logger.info("message forwarded for correlation");
      else
      logger.info("message is dropped");

      else if (filter.getCondition() == CONDITION.BETWEEN && filter.getAction() == ACTION.DISCARD)
      String operands = filter.getField().split(",");
      if (Long.parseLong(operands[0]) <= message.getSeverity()
      && Long.parseLong(operands[1]) >= message.getSeverity())
      logger.info("message is dropped");
      else
      logger.info("message forwarded for correlation");

      else
      throw new IllegalArgumentException("The CONDITION or ACTION is not supported");

      else if (filter.getType() == TYPE.STRING_COMPARATOR)
      if (filter.getCondition() == CONDITION.MATCHES && filter.getAction() == ACTION.ALLOW)
      if (message.getMessage().contains(filter.getField()))
      logger.info("message forwarded for correlation");
      else
      logger.info("message is dropped");

      else if (filter.getCondition() == CONDITION.MATCHES && filter.getAction() == ACTION.DISCARD)
      if (message.getMessage().contains(filter.getField()))
      logger.info("message is dropped");
      else
      logger.info("message forwarded for correlation");


      else
      throw new IllegalArgumentException("The filter type " + filter.getType() + " is not supported. ");



      return isDropped;




      The ConsumerService consumes the messages and using the unique id in the message it gets all the filters.



      After getting all the filters the message is validated on the filter list and checks if the message needs to be forwarded for further processing.



      I would like my code to be reviewed on mainly the design and what are the best practices i could follow to avoid clumsiness.



      Edit: Adding the new changes to the code



      After giving a lot of thought i have create interface for generating and evaluating expressions. Created a Builder class to create Operands class which caters to 2 to 3 operands for an expression generation



      @Service
      public class ConsumerService

      final Logger logger = LoggerFactory.getLogger(ConsumerService.class);

      // TODO - consider renaming the interface?
      @Autowired
      private IFilterService filterService;

      @Autowired
      private ProducerService producerService;

      public void consume()

      String topic = "daas.filter.json.incoming";
      String consGroup = "syslogJsonconsumer";

      KafkaConsumer<String, JsonNode> consumer = new GenericConsumer<String, JsonNode>().initialize(topic, consGroup,
      STREAMSERDE.STRINGDESER, STREAMSERDE.JSONDESER);

      logger.debug("filter Kafka Consumer Initialized......");

      try
      while (true)
      ConsumerRecords<String, JsonNode> records = consumer.poll(100);

      for (ConsumerRecord<String, JsonNode> record : records)

      logger.debug("record.offset() = " + record.offset() + " : record.key() = " + record.key());

      // TODO - receive avro message instead of JSONNode.
      JsonNode jsonNode = record.value();

      logger.info("##Filter Consumer##>>incoming Message = " + jsonNode);

      ObjectMapper objectMapper = new ObjectMapper();
      NormalizedSyslogMessage syslog = (NormalizedSyslogMessage) objectMapper.convertValue(jsonNode,
      NormalizedSyslogMessage.class);

      // TODO - consider moving below code out of Consumer.

      // apply filters on the incoming message. Returns isDropped.
      boolean isDropped = applyFilter(syslog);
      logger.info("message [" + syslog.getKey() + " - isDropped##>> = " + isDropped);

      if (!isDropped)
      // TODO forward the rawSyslogMessage to Correlation
      // Service
      producerService.produce(syslog);





      catch (Exception e)
      logger.error("Error occured while processing message", e);
      finally
      logger.debug("debupe kafka consume is closing");
      consumer.close();




      // load filters for a given party and apply on the rawSyslogMessage.
      private boolean applyFilter(NormalizedSyslogMessage message) throws Exception

      boolean isDropped = false;

      logger.info("---applyFilter()::rawSyslog :" + message);

      String partyID = message.getPartyID();

      // load all filters for the given party
      List<Filter> filters = filterService.getAll(partyID);

      if (filters != null)
      for (Filter filter : filters)
      Operands operands;
      if (filter.getType() == TYPE.NUMBER_COMPARATOR)
      operands = new Operands.OperandsBuilder(Long.toString(message.getSeverity()), filter.getField())
      .build();
      if (filter.getCondition().equals(CONDITION.BETWEEN))
      String fields = filter.getField().split(",");
      operands = new Operands.OperandsBuilder(fields[0], Long.toString(message.getSeverity()))
      .middleOperand(fields[1]).build();

      Expression expression = ExpressionPicker.pickExpressionBy(filter.getCondition(), filter.getAction(),
      operands);
      isDropped = expression.evaluate();
      else if (filter.getType() == TYPE.STRING_COMPARATOR)
      operands = new Operands.OperandsBuilder(message.getMessage(), filter.getField())
      .build();
      Expression expression = ExpressionPicker.pickExpressionBy(filter.getCondition(), filter.getAction(),
      operands);
      isDropped = expression.evaluate();
      else
      throw new Exception("Filter invalid, please check.");



      return isDropped;



      /*
      Interface for implementing different kinds of expressions
      */

      public interface Expression
      boolean evaluate();


      /*
      Factory to generate different expressions.
      */

      public class ExpressionPicker

      public static Expression pickExpressionBy(CONDITION condition, ACTION action, Operands operands) condition.name().equals("LESSER") && action.name().equals("ALLOW"))
      return new IsLesserExpression(Integer.parseInt(operands.get_leftOperand()),
      Integer.parseInt(operands.get_rightOperand()));
      else if (condition.name().equals("EQUALS") && action.name().equals("ALLOW"))
      return new IsEqualsExpression(Integer.parseInt(operands.get_leftOperand()),
      Integer.parseInt(operands.get_rightOperand()), ACTION.ALLOW);
      else if (condition.name().equals("EQUALS") && action.name().equals("DISCARD"))
      return new IsEqualsExpression(Integer.parseInt(operands.get_leftOperand()),
      Integer.parseInt(operands.get_rightOperand()), ACTION.DISCARD);
      else if (condition.name().equals("BETWEEN") && action.name().equals("ALLOW"))
      return new IsInBetweenExpression(Integer.parseInt(operands.get_leftOperand()),
      Integer.parseInt(operands.get_rightOperand()), Integer.parseInt(operands.get_middleOperand()),
      ACTION.ALLOW);
      else if (condition.name().equals("BETWEEN") && action.name().equals("DISCARD"))
      return new IsInBetweenExpression(Integer.parseInt(operands.get_leftOperand()),
      Integer.parseInt(operands.get_rightOperand()), Integer.parseInt(operands.get_middleOperand()),
      ACTION.DISCARD);
      else if (condition.name().equals("MATCHES") && action.name().equals("ALLOW"))
      return new IsMatchingWithExpression(operands.get_leftOperand(), operands.get_rightOperand(), ACTION.ALLOW);
      else if (condition.name().equals("MATCHES") && action.name().equals("DISCARD"))
      return new IsMatchingWithExpression(operands.get_leftOperand(), operands.get_rightOperand(),
      ACTION.DISCARD);
      else
      throw new IllegalArgumentException("Invalid input");






      public class IsEqualsExpression implements Expression

      private int _leftOperand;
      private int _rightOperand;
      private ACTION _action;

      public IsEqualsExpression(int leftOperand, int rightOperand, ACTION action)
      this._leftOperand = leftOperand;
      this._rightOperand = rightOperand;
      this._action = action;


      @Override
      public boolean evaluate()
      if (_action.name() == "ALLOW" && _leftOperand == _rightOperand)
      return true;

      return false;





      public class IsGreaterExpression implements Expression

      private int _leftOperand;
      private int _rightOperand;

      public IsGreaterExpression(int leftOperand, int rightOperand)
      this._leftOperand = leftOperand;
      this._rightOperand = rightOperand;


      @Override
      public boolean evaluate()
      if (_leftOperand > _rightOperand)
      return true;

      return false;






      public class IsInBetweenExpression implements Expression

      private int _leftOperand;
      private int _rightOperand;
      private int _middleOperand;
      private ACTION _action;

      public IsInBetweenExpression(int leftOperand, int rightOperand, int middleOperand, ACTION action)
      this._leftOperand = leftOperand;
      this._rightOperand = rightOperand;
      this._middleOperand = middleOperand;
      this._action = action;


      @Override
      public boolean evaluate()
      if (_action.name() == "ALLOW")
      return getRange();

      return false;


      private boolean getRange()
      if (_leftOperand <= _middleOperand && _rightOperand >= _middleOperand)
      return true;

      return false;





      public class IsLesserExpression implements Expression

      private int _leftOperand;
      private int _rightOperand;

      public IsLesserExpression(int leftOperand, int rightOperand)
      this._leftOperand = leftOperand;
      this._rightOperand = rightOperand;

      @Override
      public boolean evaluate()
      if (_leftOperand > _rightOperand)
      return true;

      return false;




      public class IsMatchingWithExpression implements Expression

      private String _rightOperand;
      private String _leftOperand;
      private ACTION _action;

      public IsMatchingWithExpression(String inputString, String patternString, ACTION action)
      this._rightOperand = inputString;
      this._leftOperand = patternString;
      this._action = action;


      @Override
      public boolean evaluate()
      if (_action.name() == "ALLOW")
      return getMatchResult();

      return false;


      private boolean getMatchResult()
      if (_rightOperand.contains(_leftOperand))
      return true;

      return false;




      /* Builder class to build operands to feed to the expressions for end result.

      public class Operands
      private final String _rightOperand;
      private final String _leftOperand;
      private final String _middleOperand;

      private Operands(OperandsBuilder builder)
      this._rightOperand = builder._rightOperand;
      this._leftOperand = builder._leftOperand;
      this._middleOperand = builder._middleOperand;


      public String get_rightOperand()
      return _rightOperand;


      public String get_leftOperand()
      return _leftOperand;


      public String get_middleOperand()
      return _middleOperand;


      public static class OperandsBuilder
      private final String _rightOperand;
      private final String _leftOperand;
      private String _middleOperand;

      public OperandsBuilder(String leftOperand, String rightOperand)
      this._rightOperand = rightOperand;
      this._leftOperand = leftOperand;


      public OperandsBuilder middleOperand(String value)
      this._middleOperand = value;
      return this;


      public Operands build()
      return new Operands(this);






      Please provide the suggestions to make the code better.



      At this moment i am aware that the validation logic needs to be cleaned up but i am not aware of how it can be done. If anybody can help me cleaning up the code i would be grateful.







      share|improve this question













      Scenario:



      I have a time series data published to a Queue (Kafka). Each message (kafka message) when consumed from the topic needs to be validated by rules which are created in a datastore. These rules are called as filters in our case.



      A filter object looks like below.



      import java.io.Serializable;
      import com.fasterxml.jackson.annotation.JsonInclude;
      import com.fasterxml.jackson.annotation.JsonProperty;
      import com.fasterxml.jackson.annotation.JsonPropertyOrder;

      @JsonInclude(JsonInclude.Include.NON_NULL)
      @JsonPropertyOrder( "id", "name", "field", "condition", "action" )
      public class Filter implements Serializable

      private static final long serialVersionUID = 7488270474843121039L;
      @JsonProperty("id")
      private String id;
      @JsonProperty("name")
      private String name;
      @JsonProperty("field")
      private String field;
      @JsonProperty("condition")
      private CONDITION condition;
      @JsonProperty("action")
      private ACTION action;
      @JsonProperty("type")
      private TYPE type;

      @JsonProperty("id")
      public String getId()
      return id;


      @JsonProperty("id")
      public void setId(String id)
      this.id = id;


      @JsonProperty("name")
      public String getName()
      return name;


      @JsonProperty("name")
      public void setName(String name)
      this.name = name;


      @JsonProperty("field")
      public String getField()
      return field;


      @JsonProperty("field")
      public void setField(String field)
      this.field = field;


      @JsonProperty("condition")
      public CONDITION getCondition()
      return condition;


      @JsonProperty("condition")
      public void setCondition(CONDITION condition)
      this.condition = condition;


      @JsonProperty("action")
      public ACTION getAction()
      return action;


      @JsonProperty("action")
      public void setAction(ACTION action)
      this.action = action;


      @JsonProperty("type")
      public TYPE getType()
      return type;


      @JsonProperty("type")
      public void setType(TYPE type)
      this.type = type;





      The service which consumes and performs validation is as below.



      @Service
      public class ConsumerService

      final Logger logger = LoggerFactory.getLogger(ConsumerService.class);

      // TODO - consider renaming the interface?
      @Autowired
      private IFilterService filterService;

      @Autowired
      private ProducerService producerService;

      public void consume()

      String topic = "daas.filter.json.incoming";
      String consGroup = "syslogJsonconsumer";

      KafkaConsumer<String, JsonNode> consumer = new GenericConsumer<String, JsonNode>().initialize(topic, consGroup,
      STREAMSERDE.STRINGDESER, STREAMSERDE.JSONDESER);

      logger.debug("filter Kafka Consumer Initialized......");

      try
      while (true)
      ConsumerRecords<String, JsonNode> records = consumer.poll(100);

      for (ConsumerRecord<String, JsonNode> record : records)

      logger.debug("record.offset() = " + record.offset() + " : record.key() = " + record.key());

      // TODO - receive avro message instead of JSONNode.
      JsonNode jsonNode = record.value();

      logger.info("##Filter Consumer##>>incoming Message = " + jsonNode);

      ObjectMapper objectMapper = new ObjectMapper();
      NormalizedSyslogMessage syslog = (NormalizedSyslogMessage) objectMapper.convertValue(jsonNode,
      NormalizedSyslogMessage.class);

      // TODO - consider moving below code out of Consumer.

      // apply filters on the incoming message. Returns isDropped.
      boolean isDropped = applyFilter(syslog);
      logger.info("message [" + syslog.getKey() + " - isDropped##>> = " + isDropped);

      if (!isDropped)
      // TODO forward the rawSyslogMessage to Correlation
      // Service
      producerService.produce(syslog);





      catch (Exception e)
      logger.error("Error occured while processing message", e);
      finally
      logger.debug("debupe kafka consume is closing");
      consumer.close();




      // load filters for a given party and apply on the rawSyslogMessage.
      private boolean applyFilter(NormalizedSyslogMessage message) throws EntityNotFoundException

      boolean isDropped = false;

      logger.info("---applyFilter()::rawSyslog :" + message);

      String partyID = message.getPartyID();

      // load all filters for the given party
      List<Filter> filters = filterService.getAll(partyID);

      if (filters != null)
      for (Filter filter : filters)
      if (filter.getType() == TYPE.NUMBER_COMPARATOR)
      if (filter.getCondition() == CONDITION.GREATER && filter.getAction() == ACTION.ALLOW)
      if (message.getSeverity() > Long.parseLong(filter.getField()))
      logger.info("message forwarded for correlation");
      else
      logger.info("message is dropped");

      else if (filter.getCondition() == CONDITION.LESSER && filter.getAction() == ACTION.ALLOW)
      if (message.getSeverity() < Long.parseLong(filter.getField()))
      logger.info("message forwarded for correlation");
      else
      logger.info("message is dropped");

      else if (filter.getCondition() == CONDITION.GREATER && filter.getAction() == ACTION.DISCARD)
      if (message.getSeverity() > Long.parseLong(filter.getField()))
      logger.info("message is dropped");
      else
      logger.info("message forwarded for correlation");

      else if (filter.getCondition() == CONDITION.LESSER && filter.getAction() == ACTION.DISCARD)
      if (message.getSeverity() < Long.parseLong(filter.getField()))
      logger.info("message is dropped");
      else
      logger.info("message forwarded for correlation");

      else if (filter.getCondition() == CONDITION.EQUALS && filter.getAction() == ACTION.ALLOW)
      if (message.getSeverity() == Long.parseLong(filter.getField()))
      logger.info("message forwarded for correlation");
      else
      logger.info("message is dropped");

      else if (filter.getCondition() == CONDITION.EQUALS && filter.getAction() == ACTION.DISCARD)
      if (message.getSeverity() == Long.parseLong(filter.getField()))
      logger.info("message is dropped");
      else
      logger.info("message forwarded for correlation");

      else if (filter.getCondition() == CONDITION.BETWEEN && filter.getAction() == ACTION.ALLOW)
      String operands = filter.getField().split(",");
      if (Long.parseLong(operands[0]) <= message.getSeverity()
      && Long.parseLong(operands[1]) >= message.getSeverity())
      logger.info("message forwarded for correlation");
      else
      logger.info("message is dropped");

      else if (filter.getCondition() == CONDITION.BETWEEN && filter.getAction() == ACTION.DISCARD)
      String operands = filter.getField().split(",");
      if (Long.parseLong(operands[0]) <= message.getSeverity()
      && Long.parseLong(operands[1]) >= message.getSeverity())
      logger.info("message is dropped");
      else
      logger.info("message forwarded for correlation");

      else
      throw new IllegalArgumentException("The CONDITION or ACTION is not supported");

      else if (filter.getType() == TYPE.STRING_COMPARATOR)
      if (filter.getCondition() == CONDITION.MATCHES && filter.getAction() == ACTION.ALLOW)
      if (message.getMessage().contains(filter.getField()))
      logger.info("message forwarded for correlation");
      else
      logger.info("message is dropped");

      else if (filter.getCondition() == CONDITION.MATCHES && filter.getAction() == ACTION.DISCARD)
      if (message.getMessage().contains(filter.getField()))
      logger.info("message is dropped");
      else
      logger.info("message forwarded for correlation");


      else
      throw new IllegalArgumentException("The filter type " + filter.getType() + " is not supported. ");



      return isDropped;




      The ConsumerService consumes the messages and using the unique id in the message it gets all the filters.



      After getting all the filters the message is validated on the filter list and checks if the message needs to be forwarded for further processing.



      I would like my code to be reviewed on mainly the design and what are the best practices i could follow to avoid clumsiness.



      Edit: Adding the new changes to the code



      After giving a lot of thought i have create interface for generating and evaluating expressions. Created a Builder class to create Operands class which caters to 2 to 3 operands for an expression generation



      @Service
      public class ConsumerService

      final Logger logger = LoggerFactory.getLogger(ConsumerService.class);

      // TODO - consider renaming the interface?
      @Autowired
      private IFilterService filterService;

      @Autowired
      private ProducerService producerService;

      public void consume()

      String topic = "daas.filter.json.incoming";
      String consGroup = "syslogJsonconsumer";

      KafkaConsumer<String, JsonNode> consumer = new GenericConsumer<String, JsonNode>().initialize(topic, consGroup,
      STREAMSERDE.STRINGDESER, STREAMSERDE.JSONDESER);

      logger.debug("filter Kafka Consumer Initialized......");

      try
      while (true)
      ConsumerRecords<String, JsonNode> records = consumer.poll(100);

      for (ConsumerRecord<String, JsonNode> record : records)

      logger.debug("record.offset() = " + record.offset() + " : record.key() = " + record.key());

      // TODO - receive avro message instead of JSONNode.
      JsonNode jsonNode = record.value();

      logger.info("##Filter Consumer##>>incoming Message = " + jsonNode);

      ObjectMapper objectMapper = new ObjectMapper();
      NormalizedSyslogMessage syslog = (NormalizedSyslogMessage) objectMapper.convertValue(jsonNode,
      NormalizedSyslogMessage.class);

      // TODO - consider moving below code out of Consumer.

      // apply filters on the incoming message. Returns isDropped.
      boolean isDropped = applyFilter(syslog);
      logger.info("message [" + syslog.getKey() + " - isDropped##>> = " + isDropped);

      if (!isDropped)
      // TODO forward the rawSyslogMessage to Correlation
      // Service
      producerService.produce(syslog);





      catch (Exception e)
      logger.error("Error occured while processing message", e);
      finally
      logger.debug("debupe kafka consume is closing");
      consumer.close();




      // load filters for a given party and apply on the rawSyslogMessage.
      private boolean applyFilter(NormalizedSyslogMessage message) throws Exception

      boolean isDropped = false;

      logger.info("---applyFilter()::rawSyslog :" + message);

      String partyID = message.getPartyID();

      // load all filters for the given party
      List<Filter> filters = filterService.getAll(partyID);

      if (filters != null)
      for (Filter filter : filters)
      Operands operands;
      if (filter.getType() == TYPE.NUMBER_COMPARATOR)
      operands = new Operands.OperandsBuilder(Long.toString(message.getSeverity()), filter.getField())
      .build();
      if (filter.getCondition().equals(CONDITION.BETWEEN))
      String fields = filter.getField().split(",");
      operands = new Operands.OperandsBuilder(fields[0], Long.toString(message.getSeverity()))
      .middleOperand(fields[1]).build();

      Expression expression = ExpressionPicker.pickExpressionBy(filter.getCondition(), filter.getAction(),
      operands);
      isDropped = expression.evaluate();
      else if (filter.getType() == TYPE.STRING_COMPARATOR)
      operands = new Operands.OperandsBuilder(message.getMessage(), filter.getField())
      .build();
      Expression expression = ExpressionPicker.pickExpressionBy(filter.getCondition(), filter.getAction(),
      operands);
      isDropped = expression.evaluate();
      else
      throw new Exception("Filter invalid, please check.");



      return isDropped;



      /*
      Interface for implementing different kinds of expressions
      */

      public interface Expression
      boolean evaluate();


      /*
      Factory to generate different expressions.
      */

      public class ExpressionPicker

      public static Expression pickExpressionBy(CONDITION condition, ACTION action, Operands operands) condition.name().equals("LESSER") && action.name().equals("ALLOW"))
      return new IsLesserExpression(Integer.parseInt(operands.get_leftOperand()),
      Integer.parseInt(operands.get_rightOperand()));
      else if (condition.name().equals("EQUALS") && action.name().equals("ALLOW"))
      return new IsEqualsExpression(Integer.parseInt(operands.get_leftOperand()),
      Integer.parseInt(operands.get_rightOperand()), ACTION.ALLOW);
      else if (condition.name().equals("EQUALS") && action.name().equals("DISCARD"))
      return new IsEqualsExpression(Integer.parseInt(operands.get_leftOperand()),
      Integer.parseInt(operands.get_rightOperand()), ACTION.DISCARD);
      else if (condition.name().equals("BETWEEN") && action.name().equals("ALLOW"))
      return new IsInBetweenExpression(Integer.parseInt(operands.get_leftOperand()),
      Integer.parseInt(operands.get_rightOperand()), Integer.parseInt(operands.get_middleOperand()),
      ACTION.ALLOW);
      else if (condition.name().equals("BETWEEN") && action.name().equals("DISCARD"))
      return new IsInBetweenExpression(Integer.parseInt(operands.get_leftOperand()),
      Integer.parseInt(operands.get_rightOperand()), Integer.parseInt(operands.get_middleOperand()),
      ACTION.DISCARD);
      else if (condition.name().equals("MATCHES") && action.name().equals("ALLOW"))
      return new IsMatchingWithExpression(operands.get_leftOperand(), operands.get_rightOperand(), ACTION.ALLOW);
      else if (condition.name().equals("MATCHES") && action.name().equals("DISCARD"))
      return new IsMatchingWithExpression(operands.get_leftOperand(), operands.get_rightOperand(),
      ACTION.DISCARD);
      else
      throw new IllegalArgumentException("Invalid input");






      public class IsEqualsExpression implements Expression

      private int _leftOperand;
      private int _rightOperand;
      private ACTION _action;

      public IsEqualsExpression(int leftOperand, int rightOperand, ACTION action)
      this._leftOperand = leftOperand;
      this._rightOperand = rightOperand;
      this._action = action;


      @Override
      public boolean evaluate()
      if (_action.name() == "ALLOW" && _leftOperand == _rightOperand)
      return true;

      return false;





      public class IsGreaterExpression implements Expression

      private int _leftOperand;
      private int _rightOperand;

      public IsGreaterExpression(int leftOperand, int rightOperand)
      this._leftOperand = leftOperand;
      this._rightOperand = rightOperand;


      @Override
      public boolean evaluate()
      if (_leftOperand > _rightOperand)
      return true;

      return false;






      public class IsInBetweenExpression implements Expression

      private int _leftOperand;
      private int _rightOperand;
      private int _middleOperand;
      private ACTION _action;

      public IsInBetweenExpression(int leftOperand, int rightOperand, int middleOperand, ACTION action)
      this._leftOperand = leftOperand;
      this._rightOperand = rightOperand;
      this._middleOperand = middleOperand;
      this._action = action;


      @Override
      public boolean evaluate()
      if (_action.name() == "ALLOW")
      return getRange();

      return false;


      private boolean getRange()
      if (_leftOperand <= _middleOperand && _rightOperand >= _middleOperand)
      return true;

      return false;





      public class IsLesserExpression implements Expression

      private int _leftOperand;
      private int _rightOperand;

      public IsLesserExpression(int leftOperand, int rightOperand)
      this._leftOperand = leftOperand;
      this._rightOperand = rightOperand;

      @Override
      public boolean evaluate()
      if (_leftOperand > _rightOperand)
      return true;

      return false;




      public class IsMatchingWithExpression implements Expression

      private String _rightOperand;
      private String _leftOperand;
      private ACTION _action;

      public IsMatchingWithExpression(String inputString, String patternString, ACTION action)
      this._rightOperand = inputString;
      this._leftOperand = patternString;
      this._action = action;


      @Override
      public boolean evaluate()
      if (_action.name() == "ALLOW")
      return getMatchResult();

      return false;


      private boolean getMatchResult()
      if (_rightOperand.contains(_leftOperand))
      return true;

      return false;




      /* Builder class to build operands to feed to the expressions for end result.

      public class Operands
      private final String _rightOperand;
      private final String _leftOperand;
      private final String _middleOperand;

      private Operands(OperandsBuilder builder)
      this._rightOperand = builder._rightOperand;
      this._leftOperand = builder._leftOperand;
      this._middleOperand = builder._middleOperand;


      public String get_rightOperand()
      return _rightOperand;


      public String get_leftOperand()
      return _leftOperand;


      public String get_middleOperand()
      return _middleOperand;


      public static class OperandsBuilder
      private final String _rightOperand;
      private final String _leftOperand;
      private String _middleOperand;

      public OperandsBuilder(String leftOperand, String rightOperand)
      this._rightOperand = rightOperand;
      this._leftOperand = leftOperand;


      public OperandsBuilder middleOperand(String value)
      this._middleOperand = value;
      return this;


      public Operands build()
      return new Operands(this);






      Please provide the suggestions to make the code better.



      At this moment i am aware that the validation logic needs to be cleaned up but i am not aware of how it can be done. If anybody can help me cleaning up the code i would be grateful.









      share|improve this question












      share|improve this question




      share|improve this question








      edited Jan 6 at 21:51
























      asked Jan 6 at 18:27









      wandermonk

      250211




      250211




















          2 Answers
          2






          active

          oldest

          votes

















          up vote
          1
          down vote













          Here are my comments:



          1) In the first version of the consumer, you compared the enum values correctly. However, in the edited version you suddenly decided to use the String names for comparison. This is simply wrong. First, you have the popular bug of using == instead of equals in several places. Second, using enum values allows the compiler to check you didn't mistype "EQUALS" in one of the (many) places you typed it.



          2) Your expression picking logic in pickExpressionBy is overly complex and confusing in my eyes. You pick an expression based on condition and action. this results in the confusing situation of returning IsGreaterExpression when the condition is LESSER. A better approach would be to ignore the action during expression picking. this way, the connection between the filter's CONDITION and the type of Expression is simpler and straightforward. The consumer gets the result of the Expressions evaluation, then queries the action and simply reverse the answer accordingly.



          now, while we are talking about expression picking: your current logic goes though great length to enforce the assumption that "lesser then" is the opposite of "greater then" - so why did you write both expressions? you can write just one of them (for example "lesser then") and then in "greater then"'s evaluate method call evaluate of "lesser then" and return opposite value.



          3) regarding responsibilities: if Filter holds the condition (and action) why not let it pick the expression? moreover, if we do decide that a CONDITION is all that is needed, why not let the enum itself pick the expression?



          4) regarding the Expressions: the evaluate() can be simplified:



          public class IsGreaterExpression implements Expression 

          ...

          @Override
          public boolean evaluate()
          return _leftOperand > _rightOperand;







          share|improve this answer






























            up vote
            1
            down vote













            Enum names and comparison



            CONDITION and ACTION should be renamed as Condition and Action respectively, as Java types are usually not in CAPS. Also, when you want to compare equality of an enum, you shouldn't have to compare on its name(), but rather on the value itself:



            // condition.name().equals("GREATER") && action.name().equals("ALLOW")
            condition == GREATER && action == ALLOW


            == is safe for comparing enum values as they are guaranteed to match the same instance within the same class-loader.



            Boolean logic



            A lot of your code



            if (condition) 
            return true;

            return false;


            Can simplified to just return condition.



            Also, (and this leads to the next section), you need to be more careful about how you apply negation via your Action.DISCARD, e.g. when you DISCARD LESSER values, you are ALLOW-ing both GREATER and also EQUAL values.




            ExpressionPicker and Expression



            While your ExpressionPicker and Expression is a step in the right direction, you should attempt to model it as a Predicate for an Operand.



            Then, you should consider adding some helper methods on your Operand to nicely parse the values to the required types, e.g. int in most cases.



            So, assuming you end up with an Operand similar to the following interface:



            interface Operand 
            int left();

            int middle();

            int right();

            String getLeft();

            String getRight();



            And you have a bunch of extra Condition values to better handle the negation logic:



            enum Condition 
            GREATER, GREATER_EQUALS, EQUALS, LESSER_EQUALS, LESSER, BETWEEN, MATCHES



            You can model an enumerated type of Expression values as such:



            enum Expression implements Predicate<Operand> 
            GREATER(op -> op.left() > op.right(),
            Map.of(Action.ALLOW, Condition.GREATER,
            Action.DISCARD, Condition.LESSER_EQUALS)),
            GREATER_EQUALS(op -> op.left() >= op.right(),
            Map.of(Action.ALLOW, Condition.GREATER_EQUALS,
            Action.DISCARD, Condition.LESSER)),
            EQUALS(op -> op.left() == op.right(),
            Map.of(Action.ALLOW, Condition.EQUALS)),
            NOT_EQUALS(op -> op.left() != op.right(),
            Map.of(Action.DISCARD, Condition.EQUALS)),
            LESSER_EQUALS(op -> op.left() <= op.right(),
            Map.of(Action.ALLOW, Condition.LESSER_EQUALS,
            Action.DISCARD, Condition.GREATER)),
            LESSER(op -> op.left() < op.right(),
            Map.of(Action.ALLOW, Condition.LESSER,
            Action.DISCARD, Condition.GREATER_EQUALS)),
            BETWEEN(op -> op.left() <= op.middle() && op.middle() <= op.right(),
            Map.of(Action.ALLOW, Condition.BETWEEN)),
            NOT_BETWEEN(op -> op.right() < op.middle()


            Over here, we map our conditions using the Action as the key, so that it can read something like: 'with the key to ALLOW, given the condition GREATER'.






            share|improve this answer





















              Your Answer




              StackExchange.ifUsing("editor", function ()
              return StackExchange.using("mathjaxEditing", function ()
              StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
              StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
              );
              );
              , "mathjax-editing");

              StackExchange.ifUsing("editor", function ()
              StackExchange.using("externalEditor", function ()
              StackExchange.using("snippets", function ()
              StackExchange.snippets.init();
              );
              );
              , "code-snippets");

              StackExchange.ready(function()
              var channelOptions =
              tags: "".split(" "),
              id: "196"
              ;
              initTagRenderer("".split(" "), "".split(" "), channelOptions);

              StackExchange.using("externalEditor", function()
              // Have to fire editor after snippets, if snippets enabled
              if (StackExchange.settings.snippets.snippetsEnabled)
              StackExchange.using("snippets", function()
              createEditor();
              );

              else
              createEditor();

              );

              function createEditor()
              StackExchange.prepareEditor(
              heartbeatType: 'answer',
              convertImagesToLinks: false,
              noModals: false,
              showLowRepImageUploadWarning: true,
              reputationToPostImages: null,
              bindNavPrevention: true,
              postfix: "",
              onDemand: true,
              discardSelector: ".discard-answer"
              ,immediatelyShowMarkdownHelp:true
              );



              );








               

              draft saved


              draft discarded


















              StackExchange.ready(
              function ()
              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f184469%2fcode-to-perform-validations-on-time-series-data%23new-answer', 'question_page');

              );

              Post as a guest






























              2 Answers
              2






              active

              oldest

              votes








              2 Answers
              2






              active

              oldest

              votes









              active

              oldest

              votes






              active

              oldest

              votes








              up vote
              1
              down vote













              Here are my comments:



              1) In the first version of the consumer, you compared the enum values correctly. However, in the edited version you suddenly decided to use the String names for comparison. This is simply wrong. First, you have the popular bug of using == instead of equals in several places. Second, using enum values allows the compiler to check you didn't mistype "EQUALS" in one of the (many) places you typed it.



              2) Your expression picking logic in pickExpressionBy is overly complex and confusing in my eyes. You pick an expression based on condition and action. this results in the confusing situation of returning IsGreaterExpression when the condition is LESSER. A better approach would be to ignore the action during expression picking. this way, the connection between the filter's CONDITION and the type of Expression is simpler and straightforward. The consumer gets the result of the Expressions evaluation, then queries the action and simply reverse the answer accordingly.



              now, while we are talking about expression picking: your current logic goes though great length to enforce the assumption that "lesser then" is the opposite of "greater then" - so why did you write both expressions? you can write just one of them (for example "lesser then") and then in "greater then"'s evaluate method call evaluate of "lesser then" and return opposite value.



              3) regarding responsibilities: if Filter holds the condition (and action) why not let it pick the expression? moreover, if we do decide that a CONDITION is all that is needed, why not let the enum itself pick the expression?



              4) regarding the Expressions: the evaluate() can be simplified:



              public class IsGreaterExpression implements Expression 

              ...

              @Override
              public boolean evaluate()
              return _leftOperand > _rightOperand;







              share|improve this answer



























                up vote
                1
                down vote













                Here are my comments:



                1) In the first version of the consumer, you compared the enum values correctly. However, in the edited version you suddenly decided to use the String names for comparison. This is simply wrong. First, you have the popular bug of using == instead of equals in several places. Second, using enum values allows the compiler to check you didn't mistype "EQUALS" in one of the (many) places you typed it.



                2) Your expression picking logic in pickExpressionBy is overly complex and confusing in my eyes. You pick an expression based on condition and action. this results in the confusing situation of returning IsGreaterExpression when the condition is LESSER. A better approach would be to ignore the action during expression picking. this way, the connection between the filter's CONDITION and the type of Expression is simpler and straightforward. The consumer gets the result of the Expressions evaluation, then queries the action and simply reverse the answer accordingly.



                now, while we are talking about expression picking: your current logic goes though great length to enforce the assumption that "lesser then" is the opposite of "greater then" - so why did you write both expressions? you can write just one of them (for example "lesser then") and then in "greater then"'s evaluate method call evaluate of "lesser then" and return opposite value.



                3) regarding responsibilities: if Filter holds the condition (and action) why not let it pick the expression? moreover, if we do decide that a CONDITION is all that is needed, why not let the enum itself pick the expression?



                4) regarding the Expressions: the evaluate() can be simplified:



                public class IsGreaterExpression implements Expression 

                ...

                @Override
                public boolean evaluate()
                return _leftOperand > _rightOperand;







                share|improve this answer

























                  up vote
                  1
                  down vote










                  up vote
                  1
                  down vote









                  Here are my comments:



                  1) In the first version of the consumer, you compared the enum values correctly. However, in the edited version you suddenly decided to use the String names for comparison. This is simply wrong. First, you have the popular bug of using == instead of equals in several places. Second, using enum values allows the compiler to check you didn't mistype "EQUALS" in one of the (many) places you typed it.



                  2) Your expression picking logic in pickExpressionBy is overly complex and confusing in my eyes. You pick an expression based on condition and action. this results in the confusing situation of returning IsGreaterExpression when the condition is LESSER. A better approach would be to ignore the action during expression picking. this way, the connection between the filter's CONDITION and the type of Expression is simpler and straightforward. The consumer gets the result of the Expressions evaluation, then queries the action and simply reverse the answer accordingly.



                  now, while we are talking about expression picking: your current logic goes though great length to enforce the assumption that "lesser then" is the opposite of "greater then" - so why did you write both expressions? you can write just one of them (for example "lesser then") and then in "greater then"'s evaluate method call evaluate of "lesser then" and return opposite value.



                  3) regarding responsibilities: if Filter holds the condition (and action) why not let it pick the expression? moreover, if we do decide that a CONDITION is all that is needed, why not let the enum itself pick the expression?



                  4) regarding the Expressions: the evaluate() can be simplified:



                  public class IsGreaterExpression implements Expression 

                  ...

                  @Override
                  public boolean evaluate()
                  return _leftOperand > _rightOperand;







                  share|improve this answer















                  Here are my comments:



                  1) In the first version of the consumer, you compared the enum values correctly. However, in the edited version you suddenly decided to use the String names for comparison. This is simply wrong. First, you have the popular bug of using == instead of equals in several places. Second, using enum values allows the compiler to check you didn't mistype "EQUALS" in one of the (many) places you typed it.



                  2) Your expression picking logic in pickExpressionBy is overly complex and confusing in my eyes. You pick an expression based on condition and action. this results in the confusing situation of returning IsGreaterExpression when the condition is LESSER. A better approach would be to ignore the action during expression picking. this way, the connection between the filter's CONDITION and the type of Expression is simpler and straightforward. The consumer gets the result of the Expressions evaluation, then queries the action and simply reverse the answer accordingly.



                  now, while we are talking about expression picking: your current logic goes though great length to enforce the assumption that "lesser then" is the opposite of "greater then" - so why did you write both expressions? you can write just one of them (for example "lesser then") and then in "greater then"'s evaluate method call evaluate of "lesser then" and return opposite value.



                  3) regarding responsibilities: if Filter holds the condition (and action) why not let it pick the expression? moreover, if we do decide that a CONDITION is all that is needed, why not let the enum itself pick the expression?



                  4) regarding the Expressions: the evaluate() can be simplified:



                  public class IsGreaterExpression implements Expression 

                  ...

                  @Override
                  public boolean evaluate()
                  return _leftOperand > _rightOperand;








                  share|improve this answer















                  share|improve this answer



                  share|improve this answer








                  edited Jan 8 at 9:57


























                  answered Jan 8 at 9:47









                  Sharon Ben Asher

                  2,073512




                  2,073512






















                      up vote
                      1
                      down vote













                      Enum names and comparison



                      CONDITION and ACTION should be renamed as Condition and Action respectively, as Java types are usually not in CAPS. Also, when you want to compare equality of an enum, you shouldn't have to compare on its name(), but rather on the value itself:



                      // condition.name().equals("GREATER") && action.name().equals("ALLOW")
                      condition == GREATER && action == ALLOW


                      == is safe for comparing enum values as they are guaranteed to match the same instance within the same class-loader.



                      Boolean logic



                      A lot of your code



                      if (condition) 
                      return true;

                      return false;


                      Can simplified to just return condition.



                      Also, (and this leads to the next section), you need to be more careful about how you apply negation via your Action.DISCARD, e.g. when you DISCARD LESSER values, you are ALLOW-ing both GREATER and also EQUAL values.




                      ExpressionPicker and Expression



                      While your ExpressionPicker and Expression is a step in the right direction, you should attempt to model it as a Predicate for an Operand.



                      Then, you should consider adding some helper methods on your Operand to nicely parse the values to the required types, e.g. int in most cases.



                      So, assuming you end up with an Operand similar to the following interface:



                      interface Operand 
                      int left();

                      int middle();

                      int right();

                      String getLeft();

                      String getRight();



                      And you have a bunch of extra Condition values to better handle the negation logic:



                      enum Condition 
                      GREATER, GREATER_EQUALS, EQUALS, LESSER_EQUALS, LESSER, BETWEEN, MATCHES



                      You can model an enumerated type of Expression values as such:



                      enum Expression implements Predicate<Operand> 
                      GREATER(op -> op.left() > op.right(),
                      Map.of(Action.ALLOW, Condition.GREATER,
                      Action.DISCARD, Condition.LESSER_EQUALS)),
                      GREATER_EQUALS(op -> op.left() >= op.right(),
                      Map.of(Action.ALLOW, Condition.GREATER_EQUALS,
                      Action.DISCARD, Condition.LESSER)),
                      EQUALS(op -> op.left() == op.right(),
                      Map.of(Action.ALLOW, Condition.EQUALS)),
                      NOT_EQUALS(op -> op.left() != op.right(),
                      Map.of(Action.DISCARD, Condition.EQUALS)),
                      LESSER_EQUALS(op -> op.left() <= op.right(),
                      Map.of(Action.ALLOW, Condition.LESSER_EQUALS,
                      Action.DISCARD, Condition.GREATER)),
                      LESSER(op -> op.left() < op.right(),
                      Map.of(Action.ALLOW, Condition.LESSER,
                      Action.DISCARD, Condition.GREATER_EQUALS)),
                      BETWEEN(op -> op.left() <= op.middle() && op.middle() <= op.right(),
                      Map.of(Action.ALLOW, Condition.BETWEEN)),
                      NOT_BETWEEN(op -> op.right() < op.middle()


                      Over here, we map our conditions using the Action as the key, so that it can read something like: 'with the key to ALLOW, given the condition GREATER'.






                      share|improve this answer

























                        up vote
                        1
                        down vote













                        Enum names and comparison



                        CONDITION and ACTION should be renamed as Condition and Action respectively, as Java types are usually not in CAPS. Also, when you want to compare equality of an enum, you shouldn't have to compare on its name(), but rather on the value itself:



                        // condition.name().equals("GREATER") && action.name().equals("ALLOW")
                        condition == GREATER && action == ALLOW


                        == is safe for comparing enum values as they are guaranteed to match the same instance within the same class-loader.



                        Boolean logic



                        A lot of your code



                        if (condition) 
                        return true;

                        return false;


                        Can simplified to just return condition.



                        Also, (and this leads to the next section), you need to be more careful about how you apply negation via your Action.DISCARD, e.g. when you DISCARD LESSER values, you are ALLOW-ing both GREATER and also EQUAL values.




                        ExpressionPicker and Expression



                        While your ExpressionPicker and Expression is a step in the right direction, you should attempt to model it as a Predicate for an Operand.



                        Then, you should consider adding some helper methods on your Operand to nicely parse the values to the required types, e.g. int in most cases.



                        So, assuming you end up with an Operand similar to the following interface:



                        interface Operand 
                        int left();

                        int middle();

                        int right();

                        String getLeft();

                        String getRight();



                        And you have a bunch of extra Condition values to better handle the negation logic:



                        enum Condition 
                        GREATER, GREATER_EQUALS, EQUALS, LESSER_EQUALS, LESSER, BETWEEN, MATCHES



                        You can model an enumerated type of Expression values as such:



                        enum Expression implements Predicate<Operand> 
                        GREATER(op -> op.left() > op.right(),
                        Map.of(Action.ALLOW, Condition.GREATER,
                        Action.DISCARD, Condition.LESSER_EQUALS)),
                        GREATER_EQUALS(op -> op.left() >= op.right(),
                        Map.of(Action.ALLOW, Condition.GREATER_EQUALS,
                        Action.DISCARD, Condition.LESSER)),
                        EQUALS(op -> op.left() == op.right(),
                        Map.of(Action.ALLOW, Condition.EQUALS)),
                        NOT_EQUALS(op -> op.left() != op.right(),
                        Map.of(Action.DISCARD, Condition.EQUALS)),
                        LESSER_EQUALS(op -> op.left() <= op.right(),
                        Map.of(Action.ALLOW, Condition.LESSER_EQUALS,
                        Action.DISCARD, Condition.GREATER)),
                        LESSER(op -> op.left() < op.right(),
                        Map.of(Action.ALLOW, Condition.LESSER,
                        Action.DISCARD, Condition.GREATER_EQUALS)),
                        BETWEEN(op -> op.left() <= op.middle() && op.middle() <= op.right(),
                        Map.of(Action.ALLOW, Condition.BETWEEN)),
                        NOT_BETWEEN(op -> op.right() < op.middle()


                        Over here, we map our conditions using the Action as the key, so that it can read something like: 'with the key to ALLOW, given the condition GREATER'.






                        share|improve this answer























                          up vote
                          1
                          down vote










                          up vote
                          1
                          down vote









                          Enum names and comparison



                          CONDITION and ACTION should be renamed as Condition and Action respectively, as Java types are usually not in CAPS. Also, when you want to compare equality of an enum, you shouldn't have to compare on its name(), but rather on the value itself:



                          // condition.name().equals("GREATER") && action.name().equals("ALLOW")
                          condition == GREATER && action == ALLOW


                          == is safe for comparing enum values as they are guaranteed to match the same instance within the same class-loader.



                          Boolean logic



                          A lot of your code



                          if (condition) 
                          return true;

                          return false;


                          Can simplified to just return condition.



                          Also, (and this leads to the next section), you need to be more careful about how you apply negation via your Action.DISCARD, e.g. when you DISCARD LESSER values, you are ALLOW-ing both GREATER and also EQUAL values.




                          ExpressionPicker and Expression



                          While your ExpressionPicker and Expression is a step in the right direction, you should attempt to model it as a Predicate for an Operand.



                          Then, you should consider adding some helper methods on your Operand to nicely parse the values to the required types, e.g. int in most cases.



                          So, assuming you end up with an Operand similar to the following interface:



                          interface Operand 
                          int left();

                          int middle();

                          int right();

                          String getLeft();

                          String getRight();



                          And you have a bunch of extra Condition values to better handle the negation logic:



                          enum Condition 
                          GREATER, GREATER_EQUALS, EQUALS, LESSER_EQUALS, LESSER, BETWEEN, MATCHES



                          You can model an enumerated type of Expression values as such:



                          enum Expression implements Predicate<Operand> 
                          GREATER(op -> op.left() > op.right(),
                          Map.of(Action.ALLOW, Condition.GREATER,
                          Action.DISCARD, Condition.LESSER_EQUALS)),
                          GREATER_EQUALS(op -> op.left() >= op.right(),
                          Map.of(Action.ALLOW, Condition.GREATER_EQUALS,
                          Action.DISCARD, Condition.LESSER)),
                          EQUALS(op -> op.left() == op.right(),
                          Map.of(Action.ALLOW, Condition.EQUALS)),
                          NOT_EQUALS(op -> op.left() != op.right(),
                          Map.of(Action.DISCARD, Condition.EQUALS)),
                          LESSER_EQUALS(op -> op.left() <= op.right(),
                          Map.of(Action.ALLOW, Condition.LESSER_EQUALS,
                          Action.DISCARD, Condition.GREATER)),
                          LESSER(op -> op.left() < op.right(),
                          Map.of(Action.ALLOW, Condition.LESSER,
                          Action.DISCARD, Condition.GREATER_EQUALS)),
                          BETWEEN(op -> op.left() <= op.middle() && op.middle() <= op.right(),
                          Map.of(Action.ALLOW, Condition.BETWEEN)),
                          NOT_BETWEEN(op -> op.right() < op.middle()


                          Over here, we map our conditions using the Action as the key, so that it can read something like: 'with the key to ALLOW, given the condition GREATER'.






                          share|improve this answer













                          Enum names and comparison



                          CONDITION and ACTION should be renamed as Condition and Action respectively, as Java types are usually not in CAPS. Also, when you want to compare equality of an enum, you shouldn't have to compare on its name(), but rather on the value itself:



                          // condition.name().equals("GREATER") && action.name().equals("ALLOW")
                          condition == GREATER && action == ALLOW


                          == is safe for comparing enum values as they are guaranteed to match the same instance within the same class-loader.



                          Boolean logic



                          A lot of your code



                          if (condition) 
                          return true;

                          return false;


                          Can simplified to just return condition.



                          Also, (and this leads to the next section), you need to be more careful about how you apply negation via your Action.DISCARD, e.g. when you DISCARD LESSER values, you are ALLOW-ing both GREATER and also EQUAL values.




                          ExpressionPicker and Expression



                          While your ExpressionPicker and Expression is a step in the right direction, you should attempt to model it as a Predicate for an Operand.



                          Then, you should consider adding some helper methods on your Operand to nicely parse the values to the required types, e.g. int in most cases.



                          So, assuming you end up with an Operand similar to the following interface:



                          interface Operand 
                          int left();

                          int middle();

                          int right();

                          String getLeft();

                          String getRight();



                          And you have a bunch of extra Condition values to better handle the negation logic:



                          enum Condition 
                          GREATER, GREATER_EQUALS, EQUALS, LESSER_EQUALS, LESSER, BETWEEN, MATCHES



                          You can model an enumerated type of Expression values as such:



                          enum Expression implements Predicate<Operand> 
                          GREATER(op -> op.left() > op.right(),
                          Map.of(Action.ALLOW, Condition.GREATER,
                          Action.DISCARD, Condition.LESSER_EQUALS)),
                          GREATER_EQUALS(op -> op.left() >= op.right(),
                          Map.of(Action.ALLOW, Condition.GREATER_EQUALS,
                          Action.DISCARD, Condition.LESSER)),
                          EQUALS(op -> op.left() == op.right(),
                          Map.of(Action.ALLOW, Condition.EQUALS)),
                          NOT_EQUALS(op -> op.left() != op.right(),
                          Map.of(Action.DISCARD, Condition.EQUALS)),
                          LESSER_EQUALS(op -> op.left() <= op.right(),
                          Map.of(Action.ALLOW, Condition.LESSER_EQUALS,
                          Action.DISCARD, Condition.GREATER)),
                          LESSER(op -> op.left() < op.right(),
                          Map.of(Action.ALLOW, Condition.LESSER,
                          Action.DISCARD, Condition.GREATER_EQUALS)),
                          BETWEEN(op -> op.left() <= op.middle() && op.middle() <= op.right(),
                          Map.of(Action.ALLOW, Condition.BETWEEN)),
                          NOT_BETWEEN(op -> op.right() < op.middle()


                          Over here, we map our conditions using the Action as the key, so that it can read something like: 'with the key to ALLOW, given the condition GREATER'.







                          share|improve this answer













                          share|improve this answer



                          share|improve this answer











                          answered Jan 8 at 11:15









                          h.j.k.

                          18.1k32490




                          18.1k32490






















                               

                              draft saved


                              draft discarded


























                               


                              draft saved


                              draft discarded














                              StackExchange.ready(
                              function ()
                              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f184469%2fcode-to-perform-validations-on-time-series-data%23new-answer', 'question_page');

                              );

                              Post as a guest













































































                              Popular posts from this blog

                              Chat program with C++ and SFML

                              Function to Return a JSON Like Objects Using VBA Collections and Arrays

                              Will my employers contract hold up in court?