1 | package pl.zankowski.iextrading4j.client.sse.manager; | |
2 | ||
3 | import com.google.common.collect.Maps; | |
4 | import javax.ws.rs.core.MediaType; | |
5 | import javax.ws.rs.sse.SseEventSource; | |
6 | import pl.zankowski.iextrading4j.client.IEXCloudToken; | |
7 | ||
8 | import java.util.Map; | |
9 | import java.util.function.Consumer; | |
10 | ||
11 | import static java.util.stream.Collectors.joining; | |
12 | ||
13 | public class SseManager { | |
14 | ||
15 | private static final String TOKEN_QUERY_PARAM = "token"; | |
16 | ||
17 | private final Map<SseRequest, SseEventSource> sseStore = Maps.newConcurrentMap(); | |
18 | ||
19 | private final SseClient sseClient; | |
20 | ||
21 | public SseManager(final SseClient sseClient) { | |
22 | this.sseClient = sseClient; | |
23 | } | |
24 | ||
25 | public <T> void subscribe(final SseRequest<T> request, final Consumer<T> consumer) { | |
26 | final String url = createURL(request, sseClient.getSseClientMetadata().getToken(), | |
27 | sseClient.getSseClientMetadata().getUrl()); | |
28 | ||
29 | final SseEventSource eventSource = SseEventSource.target(sseClient.getClient().target(url)).build(); | |
30 | ||
31 | try { | |
32 |
1
1. subscribe : negated conditional → NO_COVERAGE |
if (sseStore.containsKey(request)) { |
33 | return; | |
34 | } | |
35 | ||
36 |
1
1. subscribe : removed call to javax/ws/rs/sse/SseEventSource::register → NO_COVERAGE |
eventSource.register(event -> { |
37 | final T receivedData = event.readData(request.getResponseType(), MediaType.APPLICATION_JSON_TYPE); | |
38 |
1
1. lambda$subscribe$0 : removed call to java/util/function/Consumer::accept → NO_COVERAGE |
consumer.accept(receivedData); |
39 | }); | |
40 |
1
1. subscribe : removed call to javax/ws/rs/sse/SseEventSource::open → NO_COVERAGE |
eventSource.open(); |
41 | ||
42 | sseStore.put(request, eventSource); | |
43 | } catch (final Exception e) { | |
44 | throw new IllegalStateException(e.getMessage(), e); | |
45 | } | |
46 | } | |
47 | ||
48 | public <T> void unsubscribe(final SseRequest<T> request) { | |
49 | final SseEventSource socket = sseStore.remove(request); | |
50 |
1
1. unsubscribe : negated conditional → NO_COVERAGE |
if (socket == null) { |
51 | return; | |
52 | } | |
53 | ||
54 |
1
1. unsubscribe : removed call to javax/ws/rs/sse/SseEventSource::close → NO_COVERAGE |
socket.close(); |
55 | } | |
56 | ||
57 | private <R> String createURL(final SseRequest<R> sseRequest, final IEXCloudToken token, | |
58 | final String url) { | |
59 |
1
1. createURL : replaced return value with "" for pl/zankowski/iextrading4j/client/sse/manager/SseManager::createURL → NO_COVERAGE |
return new StringBuilder() |
60 | .append(url) | |
61 | .append(createPath(sseRequest.getPath(), sseRequest.getPathParams())) | |
62 | .append(createQueryParameters(sseRequest.getQueryParams(), resolveUrlToken(sseRequest, token))) | |
63 | .toString(); | |
64 | } | |
65 | ||
66 | private <R> String resolveUrlToken(final SseRequest<R> sseRequest, final IEXCloudToken token) { | |
67 |
1
1. resolveUrlToken : replaced return value with "" for pl/zankowski/iextrading4j/client/sse/manager/SseManager::resolveUrlToken → NO_COVERAGE |
return resolveToken(sseRequest, token); |
68 | } | |
69 | ||
70 | private <R> String resolveToken(final SseRequest<R> sseRequest, final IEXCloudToken token) { | |
71 |
2
1. resolveToken : negated conditional → NO_COVERAGE 2. resolveToken : replaced return value with "" for pl/zankowski/iextrading4j/client/sse/manager/SseManager::resolveToken → NO_COVERAGE |
return token == null |
72 | ? null | |
73 |
1
1. resolveToken : negated conditional → NO_COVERAGE |
: sseRequest.getUseSecretToken() |
74 | ? token.getSecretToken() | |
75 | : token.getPublishableToken(); | |
76 | } | |
77 | ||
78 | private String createPath(final String originalPath, final Map<String, String> pathParams) { | |
79 | String path = originalPath; | |
80 | for (final Map.Entry<String, String> entry : pathParams.entrySet()) { | |
81 | path = path.replaceFirst("\\{" + entry.getKey() + "\\}", entry.getValue()); | |
82 | } | |
83 | ||
84 |
1
1. createPath : replaced return value with "" for pl/zankowski/iextrading4j/client/sse/manager/SseManager::createPath → NO_COVERAGE |
return path; |
85 | } | |
86 | ||
87 | private String createQueryParameters(final Map<String, String> queryParams, final String publishableToken) { | |
88 |
2
1. createQueryParameters : negated conditional → NO_COVERAGE 2. createQueryParameters : negated conditional → NO_COVERAGE |
if (queryParams.isEmpty() && publishableToken == null) { |
89 | return ""; | |
90 | } | |
91 | ||
92 | final Map<String, String> paramsCopy = Maps.newHashMap(queryParams); | |
93 |
1
1. createQueryParameters : negated conditional → NO_COVERAGE |
if (publishableToken != null) { |
94 | paramsCopy.put(TOKEN_QUERY_PARAM, publishableToken); | |
95 | } | |
96 | ||
97 |
1
1. createQueryParameters : replaced return value with "" for pl/zankowski/iextrading4j/client/sse/manager/SseManager::createQueryParameters → NO_COVERAGE |
return paramsCopy.entrySet().stream() |
98 | .map(this::createQueryParam) | |
99 | .collect(joining("&", "?", "")); | |
100 | } | |
101 | ||
102 | private String createQueryParam(final Map.Entry<String, String> queryParam) { | |
103 |
1
1. createQueryParam : replaced return value with "" for pl/zankowski/iextrading4j/client/sse/manager/SseManager::createQueryParam → NO_COVERAGE |
return queryParam.getKey() + "=" + queryParam.getValue(); |
104 | } | |
105 | ||
106 | } | |
Mutations | ||
32 |
1.1 |
|
36 |
1.1 |
|
38 |
1.1 |
|
40 |
1.1 |
|
50 |
1.1 |
|
54 |
1.1 |
|
59 |
1.1 |
|
67 |
1.1 |
|
71 |
1.1 2.2 |
|
73 |
1.1 |
|
84 |
1.1 |
|
88 |
1.1 2.2 |
|
93 |
1.1 |
|
97 |
1.1 |
|
103 |
1.1 |