In my recent project, we had a use case to send API metrics to Kinesis Firehose which then pushes data to an S3 bucket. Java code to send data to Firehose delivery stream is given below:
@Component
public class ApiLoggingService {
private static final Logger LOGGER = LoggerFactory.getLogger(ApiLoggingService.class);
@Autowired
private AwsCredentialsProperties awsCredentialsProperties;
@Autowired
private AwsKinesisProperties firehoseProperties;
private AmazonKinesisFirehose firehoseClient;
ExecutorService executorService;
@PostConstruct
private void initFirehoseClient() {
firehoseClient = AmazonKinesisFirehoseClientBuilder.standard()
.withCredentials(AwsUtils.createCredentialsProvider(awsCredentialsProperties.getKeyId(),
awsCredentialsProperties.getSecret()))
.withRegion(firehoseProperties.getFirehoseRegion()).build();
executorService = firehoseProperties.getThreadpoolSize() > 0
? Executors.newFixedThreadPool(firehoseProperties.getThreadpoolSize())
: Executors.newFixedThreadPool(20);
}
/**
* Asynchronous method to log data into AWS Kinesis Firehose delivery
* stream. This method will not log data if value is null or empty.
* Properties like region and name of delivery stream is read from
* properties file. This method uses putRecord API method which writes
* single record into delivery stream. By default, each delivery stream can
* take in up to 2,000 transactions per second, 5,000 records per second, or
* 5 MB per second. The PutRecord operation returns a RecordId, which is a
* unique string assigned to each record. Producer applications can use this
* ID for purposes such as auditability and investigation.
*
* @param dataSet
* data to log
*/
public void logEvent(final Object dataSet) {
final String streamName = firehoseProperties.getFirehoseDeliveryStream();
if (!StringUtils.isNullOrEmpty(streamName) && null != dataSet) {
// Spawn a thread to log data into Firehose.
executorService.submit(() -> {
LOGGER.debug("Going to log data into Firehose delivery stream: {}", streamName);
try {
final String data = new ObjectMapper().writeValueAsString(dataSet);
LOGGER.debug("Following data is being logged: {}", data);
if (!StringUtils.isNullOrEmpty(data)) {
final PutRecordRequest putRequest = new PutRecordRequest();
putRequest.setDeliveryStreamName(streamName);
putRequest.setRecord(
new Record().withData(ByteBuffer.wrap(new StringBuilder(data).toString().getBytes())));
final PutRecordResult putResult = firehoseClient.putRecord(putRequest);
if(LOGGER.isDebugEnabled()){
LOGGER.debug("Data added to Firehose stream {} with Record Id: {} ", streamName,
putResult.getRecordId());
}
} else {
LOGGER.info("Nothing is logged into Firehose because data to be logged is null or empty.");
}
} catch (final IOException ex) {
LOGGER.error("Exception occurred when trying to parse the LogData object.", ex);
} catch (final Exception ex) {
LOGGER.error("Exception while trying to log data.", ex);
}
});
}
else{
LOGGER.info("Either stream name or dataset is null/empty.");
}
}
}
You can invoke this as follows:
public class A{
//Create an instance
@Autowired
private ApiLoggingService loggingService;
someMethod(){
//Invoke the loggingService
loggingService.logEvent(dataSet);
}
}
No comments:
Post a Comment