1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
| private void onReceiveMessageResult(ReceiveMessageResult result) { final List<MessageViewImpl> messages = result.getMessageViewImpls(); if (!messages.isEmpty()) { cacheMessages(messages); receivedMessagesQuantity.getAndAdd(messages.size()); consumer.getReceivedMessagesQuantity().getAndAdd(messages.size()); consumer.getConsumeService().consume(this, messages); } receiveMessage(); }
class FifoConsumeService extends ConsumeService { private static final Logger log = LoggerFactory.getLogger(FifoConsumeService.class);
public FifoConsumeService(ClientId clientId, MessageListener messageListener, ThreadPoolExecutor consumptionExecutor, MessageInterceptor messageInterceptor, ScheduledExecutorService scheduler) { super(clientId, messageListener, consumptionExecutor, messageInterceptor, scheduler); }
@Override public void consume(ProcessQueue pq, List<MessageViewImpl> messageViews) { consumeIteratively(pq, messageViews.iterator()); }
public void consumeIteratively(ProcessQueue pq, Iterator<MessageViewImpl> iterator) { if (!iterator.hasNext()) { return; } final MessageViewImpl messageView = iterator.next(); if (messageView.isCorrupted()) { log.error("Message is corrupted for FIFO consumption, prepare to discard it, mq={}, messageId={}, " + "clientId={}", pq.getMessageQueue(), messageView.getMessageId(), clientId); pq.discardFifoMessage(messageView); consumeIteratively(pq, iterator); return; } final ListenableFuture<ConsumeResult> future0 = consume(messageView); ListenableFuture<Void> future = Futures.transformAsync(future0, result -> pq.eraseFifoMessage(messageView, result), MoreExecutors.directExecutor()); future.addListener(() -> consumeIteratively(pq, iterator), MoreExecutors.directExecutor()); } }
class ProcessQueueImpl implements ProcessQueue { public ListenableFuture<Void> eraseFifoMessage(MessageViewImpl messageView, ConsumeResult consumeResult) { statsConsumptionResult(consumeResult); final RetryPolicy retryPolicy = consumer.getRetryPolicy(); final int maxAttempts = retryPolicy.getMaxAttempts(); int attempt = messageView.getDeliveryAttempt(); final MessageId messageId = messageView.getMessageId(); final ConsumeService service = consumer.getConsumeService(); final ClientId clientId = consumer.getClientId(); if (ConsumeResult.FAILURE.equals(consumeResult) && attempt < maxAttempts) { final Duration nextAttemptDelay = retryPolicy.getNextAttemptDelay(attempt); attempt = messageView.incrementAndGetDeliveryAttempt(); log.debug("Prepare to redeliver the fifo message because of the consumption failure, maxAttempt={}," + " attempt={}, mq={}, messageId={}, nextAttemptDelay={}, clientId={}", maxAttempts, attempt, mq, messageId, nextAttemptDelay, clientId); final ListenableFuture<ConsumeResult> future = service.consume(messageView, nextAttemptDelay); return Futures.transformAsync(future, result -> eraseFifoMessage(messageView, result), MoreExecutors.directExecutor()); } boolean ok = ConsumeResult.SUCCESS.equals(consumeResult); if (!ok) { log.info("Failed to consume fifo message finally, run out of attempt times, maxAttempts={}, " + "attempt={}, mq={}, messageId={}, clientId={}", maxAttempts, attempt, mq, messageId, clientId); } ListenableFuture<Void> future = ok ? ackMessage(messageView) : forwardToDeadLetterQueue(messageView); future.addListener(() -> evictCache(messageView), consumer.getConsumptionExecutor()); return future; } private void ackMessage(final MessageViewImpl messageView, final int attempt, final SettableFuture<Void> future0) { final ClientId clientId = consumer.getClientId(); final String consumerGroup = consumer.getConsumerGroup(); final MessageId messageId = messageView.getMessageId(); final Endpoints endpoints = messageView.getEndpoints(); final RpcFuture<AckMessageRequest, AckMessageResponse> future = consumer.ackMessage(messageView); Futures.addCallback(future, new FutureCallback<AckMessageResponse>() { @Override public void onSuccess(AckMessageResponse response) { final String requestId = future.getContext().getRequestId(); final Status status = response.getStatus(); final Code code = status.getCode(); if (Code.INVALID_RECEIPT_HANDLE.equals(code)) { log.error("Failed to ack message due to the invalid receipt handle, forgive to retry, " + "clientId={}, consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, " + "requestId={}, status message=[{}]", clientId, consumerGroup, messageId, attempt, mq, endpoints, requestId, status.getMessage()); future0.setException(new BadRequestException(code.getNumber(), requestId, status.getMessage())); return; } if (!Code.OK.equals(code)) { log.error("Failed to ack message, would attempt to re-ack later, clientId={}, " + "consumerGroup={}, attempt={}, messageId={}, mq={}, code={}, requestId={}, endpoints={}, " + "status message=[{}]", clientId, consumerGroup, attempt, messageId, mq, code, requestId, endpoints, status.getMessage()); ackMessageLater(messageView, 1 + attempt, future0); return; } future0.setFuture(Futures.immediateVoidFuture()); if (1 < attempt) { log.info("Finally, ack message successfully, clientId={}, consumerGroup={}, attempt={}, " + "messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, attempt, messageId, mq, endpoints, requestId); return; } log.debug("Ack message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, " + "endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq, endpoints, requestId); }
@Override public void onFailure(Throwable t) { log.error("Exception raised while acknowledging message, clientId={}, consumerGroup={}, " + "would attempt to re-ack later, attempt={}, messageId={}, mq={}, endpoints={}", clientId, consumerGroup, attempt, messageId, mq, endpoints, t); ackMessageLater(messageView, 1 + attempt, future0); } }, MoreExecutors.directExecutor()); } }
|