티스토리 뷰
AWS Java SDK v2를 사용하여 AWS Elasticsearch에 쿼리하는 코드를 작성해보자
Gradle 추가
Gradle에 Elasticsearch 버전을 맞추어 아래를 추가한다.
implementation 'org.elasticsearch.client:elasticsearch-rest-client:7.7.0'
implementation 'com.google.code.gson:gson:2.8.6'
implementation 'software.amazon.awssdk:auth'
implementation 'software.amazon.awssdk:apache-client'
참고로 지금 내가 쓰는 es 버전은 7.4인데 sdk 버전은 7.7.0으로 했음에도 잘 동작하는 것을 확인했다.
기본 Elasticsearch 접근 코드
이제 Java 코드에서 Elasticsearch에 접근 할 수 있는 코드는 아래 사이트를 참고했다.
https://yookeun.github.io/elasticsearch/2017/11/05/elastic-api/
이걸 AWS Elasticsearch에서 그대로 사용하려고 하면 아래와 같은 에러가 난다.
- host에 https:// 를 붙인 경우
java.io.IOException: https://your_domain.us-west-2.es.amazonaws.com: nodename nor servname provided, or not known - https:// 떼고 엔드포인트명만 붙인 경우
java.net.ConnectException: Connection refused
결국 둘다 안된다. 이는 이제 AWS 문서를 보고 해결해보자
아쉽게도 AWS Java SDK v1 에서 동작하는 샘플만 있다.
AWS Java SDK v2에서 동작하게 만들기
위 코드가 AWS Java SDK v2에서 돌지 않는 가장 큰 이유 중 하나는 AWS Java SDK v2용 AWSRequestSigningApacheInterceptor가 없기 때문이다.
이는 이 사이트(https://gist.github.com/danielprinz/edd82f8bde7d66c3293ba0b20f395892)에서 가져왔더니 정상동작 하는 것을 확인했다.
package your_package_name;
import static org.apache.http.protocol.HttpCoreContext.HTTP_TARGET_HOST;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.http.Header;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HttpContext;
import lombok.RequiredArgsConstructor;
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.auth.signer.params.Aws4SignerParams;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;
/**
* AWS SDK v2 version of: https://github.com/awslabs/aws-request-signing-apache-interceptor/blob/master/src/main/java/com/amazonaws/http/AWSRequestSigningApacheInterceptor.java
*/
@RequiredArgsConstructor
public class AWSRequestSigningApacheInterceptor implements HttpRequestInterceptor {
private final Aws4Signer signer;
private final Aws4SignerParams params;
@Override
public void process(final HttpRequest request, final HttpContext context) throws HttpException, IOException {
URIBuilder uriBuilder;
try {
uriBuilder = new URIBuilder(request.getRequestLine().getUri());
} catch (URISyntaxException e) {
throw new IOException("Invalid URI" , e);
}
final SdkHttpFullRequest.Builder signableRequestBuilder = SdkHttpFullRequest.builder();
final HttpHost host = (HttpHost) context.getAttribute(HTTP_TARGET_HOST);
if (host != null) {
signableRequestBuilder.uri(URI.create(host.toURI()));
}
final SdkHttpMethod httpMethod =
SdkHttpMethod.fromValue(request.getRequestLine().getMethod());
signableRequestBuilder.method(httpMethod);
try {
signableRequestBuilder.encodedPath(uriBuilder.build().getRawPath());
} catch (URISyntaxException e) {
throw new IOException("Invalid URI" , e);
}
if (request instanceof HttpEntityEnclosingRequest) {
HttpEntityEnclosingRequest httpEntityEnclosingRequest =
(HttpEntityEnclosingRequest) request;
if (httpEntityEnclosingRequest.getEntity() != null) {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
httpEntityEnclosingRequest.getEntity().writeTo(outputStream);
signableRequestBuilder.contentStreamProvider(() -> new ByteArrayInputStream(outputStream.toByteArray()));
}
}
// Append Parameters and Headers
nvpToMapParams(uriBuilder.getQueryParams()).forEach(signableRequestBuilder::appendRawQueryParameter);
headerArrayToMap(request.getAllHeaders()).forEach(signableRequestBuilder::appendHeader);
// Sign it
final SdkHttpFullRequest signedRequest = signer.sign(signableRequestBuilder.build(), params);
// Now copy everything back
request.setHeaders(mapToHeaderArray(signedRequest.headers()));
if (request instanceof HttpEntityEnclosingRequest) {
HttpEntityEnclosingRequest httpEntityEnclosingRequest =
(HttpEntityEnclosingRequest) request;
if (httpEntityEnclosingRequest.getEntity() != null) {
BasicHttpEntity basicHttpEntity = new BasicHttpEntity();
if (signedRequest.contentStreamProvider().isPresent()) {
basicHttpEntity.setContent(signedRequest.contentStreamProvider().get().newStream());
} else {
throw new RuntimeException("Empty content stream was not expected!");
}
httpEntityEnclosingRequest.setEntity(basicHttpEntity);
}
}
}
/**
*
* @param params list of HTTP query params as NameValuePairs
* @return a Multimap of HTTP query params
*/
private static Map<String, String> nvpToMapParams(final List<NameValuePair> params) {
Map<String, String> parameterMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
for (NameValuePair nvp : params) {
parameterMap.putIfAbsent(nvp.getName(), nvp.getValue());
}
return parameterMap;
}
/**
* @param headers modeled Header objects
* @return a Map of header entries
*/
private static Map<String, String> headerArrayToMap(final Header[] headers) {
Map<String, String> headersMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
for (Header header : headers) {
if (!skipHeader(header)) {
headersMap.put(header.getName(), header.getValue());
}
}
return headersMap;
}
/**
* @param header header line to check
* @return true if the given header should be excluded when signing
*/
private static boolean skipHeader(final Header header) {
return ("content-length".equalsIgnoreCase(header.getName())
&& "0".equals(header.getValue())) // Strip Content-Length: 0
|| "host".equalsIgnoreCase(header.getName()); // Host comes from endpoint
}
/**
* @param mapHeaders Map of header entries
* @return modeled Header objects
*/
private static Header[] mapToHeaderArray(final Map<String, List<String>> mapHeaders) {
Header[] headers = new Header[mapHeaders.size()];
int i = 0;
for (Map.Entry<String, List<String>> headerEntry : mapHeaders.entrySet()) {
for (String value : headerEntry.getValue()) {
headers[i++] = new BasicHeader(headerEntry.getKey(), value);
}
}
return headers;
}
}
그리고 Elasticsearch를 사용하기 위한 코드는 아래와 같다.
package your_package_name;
import java.util.HashMap;
import java.util.Map;
import com.google.gson.Gson;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.auth.signer.params.Aws4SignerParams;
import software.amazon.awssdk.regions.Region;
import org.apache.http.entity.ContentType;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class ElasticApi {
@Value("${awselasticsearch.host}") //properties 파일을 참고한다
private String aesEndpoint;
/**
* 엘라스틱서치에서 제공하는 api를 이용한 전송메소드
* @param method
* @param url
* @param obj
* @param jsonData
* @return
*/
public Map<String, Object> callElasticApi(final String method, final String url, final Object obj, final String jsonData) {
final Map<String, Object> result = new HashMap<>();
String jsonString;
//json형태의 파라미터가 아니라면 gson으로 만들어주자.
if (jsonData == null) {
final Gson gson = new Gson();
jsonString = gson.toJson(obj);
} else {
jsonString = jsonData;
}
// 원하는 CredentialsProvider를 사용하자
AwsCredentialsProvider awsCredentialsProvider = ProfileCredentialsProvider.create("default");
Aws4SignerParams signerParams = Aws4SignerParams.builder()
.signingName("es")
.signingRegion(Region.US_WEST_2) // 원하는 리전을 넣자
.awsCredentials(awsCredentialsProvider.resolveCredentials())
.build();
Aws4Signer signer = Aws4Signer.create();
HttpRequestInterceptor interceptor = new AWSRequestSigningApacheInterceptor(signer, signerParams);
//엘라스틱서치에서 제공하는 restClient를 통해 엘라스틱서치에 접속한다
try(RestClient restClient = RestClient.builder(HttpHost.create(aesEndpoint)).setHttpClientConfigCallback(hacb -> hacb.addInterceptorLast(interceptor)).build()) {
//엘라스틱서치에서 제공하는 response 객체
Response response = null;
//DELETE 메소드는 HttpEntity가 필요없다
if (method.equals("DELETE")) {
Request request = new Request(method, url);
request.addParameter("pretty", "true");
response = restClient.performRequest(request);
} else {
final HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
Request request = new Request(method, url);
request.addParameter("pretty", "true");
request.setEntity(entity);
response = restClient.performRequest(request);
}
//앨라스틱서치에서 리턴되는 응답코드를 받는다
final int statusCode = response.getStatusLine().getStatusCode();
//엘라스틱서치에서 리턴되는 응답메시지를 받는다
final String responseBody = EntityUtils.toString(response.getEntity());
result.put("resultCode", statusCode);
result.put("resultBody", responseBody);
} catch (final Exception e) {
result.put("resultCode", -1);
result.put("resultBody", e.toString());
}
return result;
}
}
위 코드는 Spring에서 동작하도록 @Component 로 구성하였다.
또한 Credentials은 아래 사이트를 참고하여 원하는 CredentialsProvider를 사용하도록 하자
https://docs.aws.amazon.com/ko_kr/sdk-for-java/v2/developer-guide/credentials.html
사용하기
이제 모든 준비가 끝났으니 ElasticApi를 사용해보자
@Autowired
ElasticApi elasticApi;
String campaignId = "<YOUR_CAMPAIGN_ID>"
String indexingPath = "/pinpoint*/_search";
String json = String.format("{\"_source\":[\"event_timestamp\"],\"query\":{\"bool\":{\"filter\":[{\"term\":{\"event_type.keyword\":\"_email.send\"}},{\"term\":{\"attributes.campaign_id.keyword\":\"%s\"}}]}}}", campaignId);
Map<String, Object> elasticResult = elasticApi.callElasticApi("GET", indexingPath, null, json);
System.out.println(elasticResult.get("resultCode"));
System.out.println(elasticResult.get("resultBody"));
위를 수행하면 원하는 결과가 나오는 것을 확인 할 수 있다.
영문
AWS Elasticsearch for AWS Java SDK v2
레퍼런스
AWS Java SDK Elasticsearch 제어 샘플
Elasticsearch 접근 코드 샘플
https://yookeun.github.io/elasticsearch/2017/11/05/elastic-api/
AWSRequestSigningInterceptor for AWS Java SDK v2
https://gist.github.com/danielprinz/edd82f8bde7d66c3293ba0b20f395892
'Cloud > AWS' 카테고리의 다른 글
보안 정책을 통하여 유저에게 MFA 강제하기 (0) | 2020.08.24 |
---|---|
CDK Deploy를 위한 최소한의 Policy (0) | 2020.07.28 |
AWS Pinpoint → Kinesis Firehose → Elasticsearch Event stream 활성화 (0) | 2020.07.10 |
Amazon Pinpoint 이벤트 트리거 (0) | 2020.05.31 |
AWS CloudFormation 안에 Lambda 코드를 포함하여 배포하기 (0) | 2020.04.19 |
AWS Code Build에서 Code Commit 코드 가져오기 (0) | 2020.03.29 |
node에서 aws-sdk 사용할때 promise 쓰기 (0) | 2020.03.26 |
AWS Step Functions 예제 / 람다에서 람다 호출하기 (0) | 2020.03.24 |
- Total
- Today
- Yesterday
- 드라이버
- Troubleshooting
- Cloud
- 프로그래밍
- db
- database
- gcc
- Python
- linux
- 리눅스
- kering
- Visual C++
- C
- winapi
- algorithm
- java
- API
- 안드로이드
- 음악
- driver
- android
- MFC
- AWS
- jni강좌
- it
- jni
- Quiz
- source
- C++
- NDK
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |