티스토리 뷰

AWS Java SDK v2를 사용하여 AWS Elasticsearch에 쿼리하는 코드를 작성해보자

Gradle 추가


Gradle에 Elasticsearch 버전을 맞추어 아래를 추가한다.

implementation 'org.elasticsearch.client:elasticsearch-rest-client:7.7.0'
implementation 'com.google.code.gson:gson:2.8.6'
implementation 'software.amazon.awssdk:auth'
implementation 'software.amazon.awssdk:apache-client'

참고로 지금 내가 쓰는 es 버전은 7.4인데 sdk 버전은 7.7.0으로 했음에도 잘 동작하는 것을 확인했다.

기본 Elasticsearch 접근 코드


이제 Java 코드에서 Elasticsearch에 접근 할 수 있는 코드는 아래 사이트를 참고했다.

https://yookeun.github.io/elasticsearch/2017/11/05/elastic-api/

이걸 AWS Elasticsearch에서 그대로 사용하려고 하면 아래와 같은 에러가 난다.

  • host에 https:// 를 붙인 경우
    java.io.IOException: https://your_domain.us-west-2.es.amazonaws.com: nodename nor servname provided, or not known
  • https:// 떼고 엔드포인트명만 붙인 경우
    java.net.ConnectException: Connection refused

결국 둘다 안된다. 이는 이제 AWS 문서를 보고 해결해보자

https://docs.aws.amazon.com/ko_kr/elasticsearch-service/latest/developerguide/es-request-signing.html

아쉽게도 AWS Java SDK v1 에서 동작하는 샘플만 있다.

AWS Java SDK v2에서 동작하게 만들기


위 코드가 AWS Java SDK v2에서 돌지 않는 가장 큰 이유 중 하나는 AWS Java SDK v2용 AWSRequestSigningApacheInterceptor가 없기 때문이다.

이는 이 사이트(https://gist.github.com/danielprinz/edd82f8bde7d66c3293ba0b20f395892)에서 가져왔더니 정상동작 하는 것을 확인했다.

package your_package_name;

import static org.apache.http.protocol.HttpCoreContext.HTTP_TARGET_HOST;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import org.apache.http.Header;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HttpContext;

import lombok.RequiredArgsConstructor;
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.auth.signer.params.Aws4SignerParams;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;

/**
 * AWS SDK v2 version of: https://github.com/awslabs/aws-request-signing-apache-interceptor/blob/master/src/main/java/com/amazonaws/http/AWSRequestSigningApacheInterceptor.java
 */
@RequiredArgsConstructor
public class AWSRequestSigningApacheInterceptor implements HttpRequestInterceptor {

  private final Aws4Signer signer;
  private final Aws4SignerParams params;

  @Override
  public void process(final HttpRequest request, final HttpContext context) throws HttpException, IOException {
    URIBuilder uriBuilder;
    try {
      uriBuilder = new URIBuilder(request.getRequestLine().getUri());
    } catch (URISyntaxException e) {
      throw new IOException("Invalid URI" , e);
    }

    final SdkHttpFullRequest.Builder signableRequestBuilder = SdkHttpFullRequest.builder();

    final HttpHost host = (HttpHost) context.getAttribute(HTTP_TARGET_HOST);
    if (host != null) {
      signableRequestBuilder.uri(URI.create(host.toURI()));
    }
    final SdkHttpMethod httpMethod =
      SdkHttpMethod.fromValue(request.getRequestLine().getMethod());
    signableRequestBuilder.method(httpMethod);
    try {
      signableRequestBuilder.encodedPath(uriBuilder.build().getRawPath());
    } catch (URISyntaxException e) {
      throw new IOException("Invalid URI" , e);
    }

    if (request instanceof HttpEntityEnclosingRequest) {
      HttpEntityEnclosingRequest httpEntityEnclosingRequest =
        (HttpEntityEnclosingRequest) request;
      if (httpEntityEnclosingRequest.getEntity() != null) {
        final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        httpEntityEnclosingRequest.getEntity().writeTo(outputStream);
        signableRequestBuilder.contentStreamProvider(() -> new ByteArrayInputStream(outputStream.toByteArray()));
      }
    }

    // Append Parameters and Headers
    nvpToMapParams(uriBuilder.getQueryParams()).forEach(signableRequestBuilder::appendRawQueryParameter);
    headerArrayToMap(request.getAllHeaders()).forEach(signableRequestBuilder::appendHeader);

    // Sign it
    final SdkHttpFullRequest signedRequest = signer.sign(signableRequestBuilder.build(), params);

    // Now copy everything back
    request.setHeaders(mapToHeaderArray(signedRequest.headers()));
    if (request instanceof HttpEntityEnclosingRequest) {
      HttpEntityEnclosingRequest httpEntityEnclosingRequest =
        (HttpEntityEnclosingRequest) request;
      if (httpEntityEnclosingRequest.getEntity() != null) {
        BasicHttpEntity basicHttpEntity = new BasicHttpEntity();
        if (signedRequest.contentStreamProvider().isPresent()) {
          basicHttpEntity.setContent(signedRequest.contentStreamProvider().get().newStream());
        } else {
          throw new RuntimeException("Empty content stream was not expected!");
        }
        httpEntityEnclosingRequest.setEntity(basicHttpEntity);
      }
    }
  }

  /**
   *
   * @param params list of HTTP query params as NameValuePairs
   * @return a Multimap of HTTP query params
   */
  private static Map<String, String> nvpToMapParams(final List<NameValuePair> params) {
    Map<String, String> parameterMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
    for (NameValuePair nvp : params) {
      parameterMap.putIfAbsent(nvp.getName(), nvp.getValue());
    }
    return parameterMap;
  }

  /**
   * @param headers modeled Header objects
   * @return a Map of header entries
   */
  private static Map<String, String> headerArrayToMap(final Header[] headers) {
    Map<String, String> headersMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
    for (Header header : headers) {
      if (!skipHeader(header)) {
        headersMap.put(header.getName(), header.getValue());
      }
    }
    return headersMap;
  }

  /**
   * @param header header line to check
   * @return true if the given header should be excluded when signing
   */
  private static boolean skipHeader(final Header header) {
    return ("content-length".equalsIgnoreCase(header.getName())
      && "0".equals(header.getValue())) // Strip Content-Length: 0
      || "host".equalsIgnoreCase(header.getName()); // Host comes from endpoint
  }

  /**
   * @param mapHeaders Map of header entries
   * @return modeled Header objects
   */
  private static Header[] mapToHeaderArray(final Map<String, List<String>> mapHeaders) {
    Header[] headers = new Header[mapHeaders.size()];
    int i = 0;
    for (Map.Entry<String, List<String>> headerEntry : mapHeaders.entrySet()) {
      for (String value : headerEntry.getValue()) {
        headers[i++] = new BasicHeader(headerEntry.getKey(), value);
      }
    }
    return headers;
  }

}

그리고 Elasticsearch를 사용하기 위한 코드는 아래와 같다.

package your_package_name;

import java.util.HashMap;
import java.util.Map;
import com.google.gson.Gson;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.auth.signer.params.Aws4SignerParams;
import software.amazon.awssdk.regions.Region;
import org.apache.http.entity.ContentType;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class ElasticApi {

    @Value("${awselasticsearch.host}") //properties 파일을 참고한다
    private String aesEndpoint;

    /**
     * 엘라스틱서치에서 제공하는 api를 이용한 전송메소드
     * @param method
     * @param url
     * @param obj
     * @param jsonData
     * @return
     */
    public Map<String, Object> callElasticApi(final String method, final String url, final Object obj, final String jsonData) {
        final Map<String, Object> result = new HashMap<>();

        String jsonString;
        //json형태의 파라미터가 아니라면 gson으로 만들어주자.
        if (jsonData == null) {
            final Gson gson = new Gson();
            jsonString = gson.toJson(obj);
        } else {
            jsonString = jsonData;
        }

        // 원하는 CredentialsProvider를 사용하자
        AwsCredentialsProvider awsCredentialsProvider = ProfileCredentialsProvider.create("default");

        Aws4SignerParams signerParams = Aws4SignerParams.builder()
            .signingName("es")
            .signingRegion(Region.US_WEST_2) // 원하는 리전을 넣자
            .awsCredentials(awsCredentialsProvider.resolveCredentials())
            .build();
        Aws4Signer signer = Aws4Signer.create();
        HttpRequestInterceptor interceptor = new AWSRequestSigningApacheInterceptor(signer, signerParams);

        //엘라스틱서치에서 제공하는 restClient를 통해 엘라스틱서치에 접속한다
        try(RestClient restClient = RestClient.builder(HttpHost.create(aesEndpoint)).setHttpClientConfigCallback(hacb -> hacb.addInterceptorLast(interceptor)).build()) {
            //엘라스틱서치에서 제공하는 response 객체
            Response response = null;

            //DELETE 메소드는 HttpEntity가 필요없다
            if (method.equals("DELETE")) {
                Request request = new Request(method, url);
                request.addParameter("pretty", "true");
                response = restClient.performRequest(request);
            } else {
                final HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
                Request request = new Request(method, url);
                request.addParameter("pretty", "true");
                request.setEntity(entity);
                response = restClient.performRequest(request);

            }
            //앨라스틱서치에서 리턴되는 응답코드를 받는다
            final int statusCode = response.getStatusLine().getStatusCode();
            //엘라스틱서치에서 리턴되는 응답메시지를 받는다
            final String responseBody = EntityUtils.toString(response.getEntity());
            result.put("resultCode", statusCode);
            result.put("resultBody", responseBody);
        } catch (final Exception e) {
            result.put("resultCode", -1);
            result.put("resultBody", e.toString());
        }
        return result;
    }
}

위 코드는 Spring에서 동작하도록 @Component 로 구성하였다.

또한 Credentials은 아래 사이트를 참고하여 원하는 CredentialsProvider를 사용하도록 하자

https://docs.aws.amazon.com/ko_kr/sdk-for-java/v2/developer-guide/credentials.html

사용하기


이제 모든 준비가 끝났으니 ElasticApi를 사용해보자

@Autowired
ElasticApi elasticApi;

String campaignId = "<YOUR_CAMPAIGN_ID>"
String indexingPath = "/pinpoint*/_search";
String json = String.format("{\"_source\":[\"event_timestamp\"],\"query\":{\"bool\":{\"filter\":[{\"term\":{\"event_type.keyword\":\"_email.send\"}},{\"term\":{\"attributes.campaign_id.keyword\":\"%s\"}}]}}}", campaignId);

Map<String, Object> elasticResult = elasticApi.callElasticApi("GET", indexingPath, null, json);
System.out.println(elasticResult.get("resultCode"));
System.out.println(elasticResult.get("resultBody"));

위를 수행하면 원하는 결과가 나오는 것을 확인 할 수 있다.

영문


AWS Elasticsearch for AWS Java SDK v2

레퍼런스


AWS Java SDK Elasticsearch 제어 샘플

https://docs.aws.amazon.com/ko_kr/elasticsearch-service/latest/developerguide/es-request-signing.html

Elasticsearch 접근 코드 샘플

https://yookeun.github.io/elasticsearch/2017/11/05/elastic-api/

AWSRequestSigningInterceptor for AWS Java SDK v2

https://gist.github.com/danielprinz/edd82f8bde7d66c3293ba0b20f395892

댓글
댓글쓰기 폼