Monday, August 20, 2018

Sending data to Kinesis Firehose on AWS

In my recent project, we had a use case to send API metrics to Kinesis Firehose which then pushes data to an S3 bucket. Java code to send data to Firehose delivery stream is given below:


       

@Component
public class ApiLoggingService {

 private static final Logger LOGGER = LoggerFactory.getLogger(ApiLoggingService.class);

 @Autowired
 private AwsCredentialsProperties awsCredentialsProperties;

 @Autowired
 private AwsKinesisProperties firehoseProperties;

 private AmazonKinesisFirehose firehoseClient;

 ExecutorService executorService;

 @PostConstruct
 private void initFirehoseClient() {
  firehoseClient = AmazonKinesisFirehoseClientBuilder.standard()
    .withCredentials(AwsUtils.createCredentialsProvider(awsCredentialsProperties.getKeyId(),
      awsCredentialsProperties.getSecret()))
    .withRegion(firehoseProperties.getFirehoseRegion()).build();
  
  executorService = firehoseProperties.getThreadpoolSize() > 0
    ? Executors.newFixedThreadPool(firehoseProperties.getThreadpoolSize())
    : Executors.newFixedThreadPool(20);
 }

 /**
  * Asynchronous method to log data into AWS Kinesis Firehose delivery
  * stream. This method will not log data if value is null or empty.
  * Properties like region and name of delivery stream is read from
  * properties file. This method uses putRecord API method which writes
  * single record into delivery stream. By default, each delivery stream can
  * take in up to 2,000 transactions per second, 5,000 records per second, or
  * 5 MB per second. The PutRecord operation returns a RecordId, which is a
  * unique string assigned to each record. Producer applications can use this
  * ID for purposes such as auditability and investigation.
  * 
  * @param dataSet
  *             data to log
  */
 public void logEvent(final Object dataSet) {
  final String streamName = firehoseProperties.getFirehoseDeliveryStream();
  if (!StringUtils.isNullOrEmpty(streamName) && null != dataSet) {
   // Spawn a thread to log data into Firehose.
   executorService.submit(() -> {
    LOGGER.debug("Going to log data into Firehose delivery stream: {}", streamName);
    try {
     final String data = new ObjectMapper().writeValueAsString(dataSet);
     LOGGER.debug("Following data is being logged: {}", data);
     if (!StringUtils.isNullOrEmpty(data)) {
      final PutRecordRequest putRequest = new PutRecordRequest();
      putRequest.setDeliveryStreamName(streamName);
      putRequest.setRecord(
        new Record().withData(ByteBuffer.wrap(new StringBuilder(data).toString().getBytes())));
      final PutRecordResult putResult = firehoseClient.putRecord(putRequest);
      if(LOGGER.isDebugEnabled()){
       LOGGER.debug("Data added to Firehose stream {} with Record Id: {} ", streamName,
         putResult.getRecordId());
      }
     } else {
      LOGGER.info("Nothing is logged into Firehose because data to be logged is null or empty.");
     }

    } catch (final IOException ex) {
     LOGGER.error("Exception occurred when trying to parse the LogData object.", ex);
    } catch (final Exception ex) {
     LOGGER.error("Exception while trying to log data.", ex);
    }
   });
  }
  else{
   LOGGER.info("Either stream name or dataset is null/empty.");
  }
 }
}


You can invoke this as follows:
       


public class A{
//Create an instance
@Autowired
private ApiLoggingService loggingService;
someMethod(){
//Invoke the loggingService
  loggingService.logEvent(dataSet);
  }
}



No comments:

Post a Comment