| 1 | package pl.zankowski.iextrading4j.client.sse.manager; | |
| 2 | ||
| 3 | import com.google.common.collect.Maps; | |
| 4 | import javax.ws.rs.core.MediaType; | |
| 5 | import javax.ws.rs.sse.SseEventSource; | |
| 6 | import pl.zankowski.iextrading4j.client.IEXCloudToken; | |
| 7 | ||
| 8 | import java.util.Map; | |
| 9 | import java.util.function.Consumer; | |
| 10 | ||
| 11 | import static java.util.stream.Collectors.joining; | |
| 12 | ||
| 13 | public class SseManager { | |
| 14 | ||
| 15 | private static final String TOKEN_QUERY_PARAM = "token"; | |
| 16 | ||
| 17 | private final Map<SseRequest, SseEventSource> sseStore = Maps.newConcurrentMap(); | |
| 18 | ||
| 19 | private final SseClient sseClient; | |
| 20 | ||
| 21 | public SseManager(final SseClient sseClient) { | |
| 22 | this.sseClient = sseClient; | |
| 23 | } | |
| 24 | ||
| 25 | public <T> void subscribe(final SseRequest<T> request, final Consumer<T> consumer) { | |
| 26 | final String url = createURL(request, sseClient.getSseClientMetadata().getToken(), | |
| 27 | sseClient.getSseClientMetadata().getUrl()); | |
| 28 | ||
| 29 | final SseEventSource eventSource = SseEventSource.target(sseClient.getClient().target(url)).build(); | |
| 30 | ||
| 31 | try { | |
| 32 |
1
1. subscribe : negated conditional → NO_COVERAGE |
if (sseStore.containsKey(request)) { |
| 33 | return; | |
| 34 | } | |
| 35 | ||
| 36 |
1
1. subscribe : removed call to javax/ws/rs/sse/SseEventSource::register → NO_COVERAGE |
eventSource.register(event -> { |
| 37 | final T receivedData = event.readData(request.getResponseType(), MediaType.APPLICATION_JSON_TYPE); | |
| 38 |
1
1. lambda$subscribe$0 : removed call to java/util/function/Consumer::accept → NO_COVERAGE |
consumer.accept(receivedData); |
| 39 | }); | |
| 40 |
1
1. subscribe : removed call to javax/ws/rs/sse/SseEventSource::open → NO_COVERAGE |
eventSource.open(); |
| 41 | ||
| 42 | sseStore.put(request, eventSource); | |
| 43 | } catch (final Exception e) { | |
| 44 | throw new IllegalStateException(e.getMessage(), e); | |
| 45 | } | |
| 46 | } | |
| 47 | ||
| 48 | public <T> void unsubscribe(final SseRequest<T> request) { | |
| 49 | final SseEventSource socket = sseStore.remove(request); | |
| 50 |
1
1. unsubscribe : negated conditional → NO_COVERAGE |
if (socket == null) { |
| 51 | return; | |
| 52 | } | |
| 53 | ||
| 54 |
1
1. unsubscribe : removed call to javax/ws/rs/sse/SseEventSource::close → NO_COVERAGE |
socket.close(); |
| 55 | } | |
| 56 | ||
| 57 | private <R> String createURL(final SseRequest<R> sseRequest, final IEXCloudToken token, | |
| 58 | final String url) { | |
| 59 |
1
1. createURL : replaced return value with "" for pl/zankowski/iextrading4j/client/sse/manager/SseManager::createURL → NO_COVERAGE |
return new StringBuilder() |
| 60 | .append(url) | |
| 61 | .append(createPath(sseRequest.getPath(), sseRequest.getPathParams())) | |
| 62 | .append(createQueryParameters(sseRequest.getQueryParams(), resolveUrlToken(sseRequest, token))) | |
| 63 | .toString(); | |
| 64 | } | |
| 65 | ||
| 66 | private <R> String resolveUrlToken(final SseRequest<R> sseRequest, final IEXCloudToken token) { | |
| 67 |
1
1. resolveUrlToken : replaced return value with "" for pl/zankowski/iextrading4j/client/sse/manager/SseManager::resolveUrlToken → NO_COVERAGE |
return resolveToken(sseRequest, token); |
| 68 | } | |
| 69 | ||
| 70 | private <R> String resolveToken(final SseRequest<R> sseRequest, final IEXCloudToken token) { | |
| 71 |
2
1. resolveToken : negated conditional → NO_COVERAGE 2. resolveToken : replaced return value with "" for pl/zankowski/iextrading4j/client/sse/manager/SseManager::resolveToken → NO_COVERAGE |
return token == null |
| 72 | ? null | |
| 73 |
1
1. resolveToken : negated conditional → NO_COVERAGE |
: sseRequest.getUseSecretToken() |
| 74 | ? token.getSecretToken() | |
| 75 | : token.getPublishableToken(); | |
| 76 | } | |
| 77 | ||
| 78 | private String createPath(final String originalPath, final Map<String, String> pathParams) { | |
| 79 | String path = originalPath; | |
| 80 | for (final Map.Entry<String, String> entry : pathParams.entrySet()) { | |
| 81 | path = path.replaceFirst("\\{" + entry.getKey() + "\\}", entry.getValue()); | |
| 82 | } | |
| 83 | ||
| 84 |
1
1. createPath : replaced return value with "" for pl/zankowski/iextrading4j/client/sse/manager/SseManager::createPath → NO_COVERAGE |
return path; |
| 85 | } | |
| 86 | ||
| 87 | private String createQueryParameters(final Map<String, String> queryParams, final String publishableToken) { | |
| 88 |
2
1. createQueryParameters : negated conditional → NO_COVERAGE 2. createQueryParameters : negated conditional → NO_COVERAGE |
if (queryParams.isEmpty() && publishableToken == null) { |
| 89 | return ""; | |
| 90 | } | |
| 91 | ||
| 92 | final Map<String, String> paramsCopy = Maps.newHashMap(queryParams); | |
| 93 |
1
1. createQueryParameters : negated conditional → NO_COVERAGE |
if (publishableToken != null) { |
| 94 | paramsCopy.put(TOKEN_QUERY_PARAM, publishableToken); | |
| 95 | } | |
| 96 | ||
| 97 |
1
1. createQueryParameters : replaced return value with "" for pl/zankowski/iextrading4j/client/sse/manager/SseManager::createQueryParameters → NO_COVERAGE |
return paramsCopy.entrySet().stream() |
| 98 | .map(this::createQueryParam) | |
| 99 | .collect(joining("&", "?", "")); | |
| 100 | } | |
| 101 | ||
| 102 | private String createQueryParam(final Map.Entry<String, String> queryParam) { | |
| 103 |
1
1. createQueryParam : replaced return value with "" for pl/zankowski/iextrading4j/client/sse/manager/SseManager::createQueryParam → NO_COVERAGE |
return queryParam.getKey() + "=" + queryParam.getValue(); |
| 104 | } | |
| 105 | ||
| 106 | } | |
Mutations | ||
| 32 |
1.1 |
|
| 36 |
1.1 |
|
| 38 |
1.1 |
|
| 40 |
1.1 |
|
| 50 |
1.1 |
|
| 54 |
1.1 |
|
| 59 |
1.1 |
|
| 67 |
1.1 |
|
| 71 |
1.1 2.2 |
|
| 73 |
1.1 |
|
| 84 |
1.1 |
|
| 88 |
1.1 2.2 |
|
| 93 |
1.1 |
|
| 97 |
1.1 |
|
| 103 |
1.1 |