1
0
mirror of synced 2024-11-15 01:33:51 +00:00
PettingZoo/PettingZoo.Tapeti/ExportImport/TapetiCmdImportFormat.cs

76 lines
3.3 KiB
C#
Raw Permalink Normal View History

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using PettingZoo.Core.Connection;
using PettingZoo.Core.ExportImport.Subscriber;
namespace PettingZoo.Tapeti.ExportImport
{
public class TapetiCmdImportFormat : BaseTapetiCmdExportImportFormat, IImportFormat
{
private const int DefaultBufferSize = 1024;
public async Task<IReadOnlyList<ReceivedMessageInfo>> Import(Stream stream, CancellationToken cancellationToken)
{
var now = DateTime.Now;
using var reader = new StreamReader(stream, Encoding.UTF8, true, DefaultBufferSize, true);
var messages = new List<ReceivedMessageInfo>();
while (!reader.EndOfStream && !cancellationToken.IsCancellationRequested)
{
var serialized = await reader.ReadLineAsync();
if (string.IsNullOrEmpty(serialized))
continue;
var serializableMessage = JsonConvert.DeserializeObject<SerializableMessage>(serialized);
if (serializableMessage == null)
continue;
var body = serializableMessage.Body != null
? Encoding.UTF8.GetBytes(serializableMessage.Body.ToString(Formatting.Indented))
: serializableMessage.RawBody;
var messageTimestamp = serializableMessage.Properties?.Timestamp != null
? DateTimeOffset.FromUnixTimeSeconds(serializableMessage.Properties.Timestamp.Value).LocalDateTime
: now;
messages.Add(new ReceivedMessageInfo(
serializableMessage.Exchange ?? "",
serializableMessage.RoutingKey ?? "",
body ?? Array.Empty<byte>(),
// IReadOnlyDictionary is not compatible with IDictionary? wow.
new MessageProperties(serializableMessage.Properties?.Headers?.ToDictionary(p => p.Key, p => p.Value))
{
AppId = serializableMessage.Properties?.AppId,
ContentEncoding = serializableMessage.Properties?.ContentEncoding,
ContentType = serializableMessage.Properties?.ContentType,
CorrelationId = serializableMessage.Properties?.CorrelationId,
DeliveryMode = serializableMessage.Properties?.DeliveryMode switch
{
2 => MessageDeliveryMode.Persistent,
_ => MessageDeliveryMode.NonPersistent
},
Expiration = serializableMessage.Properties?.Expiration,
MessageId = serializableMessage.Properties?.MessageId,
Priority = serializableMessage.Properties?.Priority,
ReplyTo = serializableMessage.Properties?.ReplyTo,
Timestamp = messageTimestamp,
Type = serializableMessage.Properties?.Type,
UserId = serializableMessage.Properties?.UserId
},
messageTimestamp));
}
return messages;
}
}
}