Tapeti/Tapeti.Cmd/Serialization/SingleFileJSONMessageSerial...

235 lines
8.9 KiB
C#

using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RabbitMQ.Client;
using RabbitMQ.Client.Framing;
namespace Tapeti.Cmd.Serialization
{
public class SingleFileJSONMessageSerializer : IMessageSerializer
{
private readonly string path;
private static readonly JsonSerializerSettings SerializerSettings = new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore
};
private readonly Lazy<StreamWriter> exportFile;
public SingleFileJSONMessageSerializer(string path)
{
this.path = path;
exportFile = new Lazy<StreamWriter>(() => new StreamWriter(path, false, Encoding.UTF8));
}
public void Serialize(Message message)
{
var serializableMessage = new SerializableMessage(message);
var serialized = JsonConvert.SerializeObject(serializableMessage, SerializerSettings);
exportFile.Value.WriteLine(serialized);
}
public IEnumerable<Message> Deserialize()
{
using (var file = new StreamReader(path))
{
while (!file.EndOfStream)
{
var serialized = file.ReadLine();
if (string.IsNullOrEmpty(serialized))
continue;
var serializableMessage = JsonConvert.DeserializeObject<SerializableMessage>(serialized);
if (serializableMessage == null)
continue;
yield return serializableMessage.ToMessage();
}
}
}
public void Dispose()
{
if (exportFile.IsValueCreated)
exportFile.Value.Dispose();
}
// ReSharper disable MemberCanBePrivate.Local - used for JSON serialization
// ReSharper disable NotAccessedField.Local
// ReSharper disable FieldCanBeMadeReadOnly.Local
private class SerializableMessage
{
public ulong DeliveryTag;
public bool Redelivered;
public string Exchange;
public string RoutingKey;
public string Queue;
// ReSharper disable once FieldCanBeMadeReadOnly.Local - must be settable by JSON deserialization
public SerializableMessageProperties Properties;
public JObject Body;
public byte[] RawBody;
// ReSharper disable once UnusedMember.Global - used by JSON deserialization
// ReSharper disable once UnusedMember.Local
public SerializableMessage()
{
Properties = new SerializableMessageProperties();
}
public SerializableMessage(Message fromMessage)
{
DeliveryTag = fromMessage.DeliveryTag;
Redelivered = fromMessage.Redelivered;
Exchange = fromMessage.Exchange;
RoutingKey = fromMessage.RoutingKey;
Queue = fromMessage.Queue;
Properties = new SerializableMessageProperties(fromMessage.Properties);
// If this is detected as a JSON message, include the object directly in the JSON line so that it is easier
// to read and process in the output file. Otherwise simply include the raw data and let Newtonsoft encode it.
// This does mean the message will be rewritten. If this is an issue, feel free to add a "raw" option to this tool
// that forces the RawBody to be used. It is open-source after all :-).
if (Properties.ContentType == "application/json")
{
try
{
Body = JObject.Parse(Encoding.UTF8.GetString(fromMessage.Body));
RawBody = null;
}
catch
{
// Fall back to using the raw body
Body = null;
RawBody = fromMessage.Body;
}
}
else
{
Body = null;
RawBody = fromMessage.Body;
}
}
public Message ToMessage()
{
return new Message
{
DeliveryTag = DeliveryTag,
Redelivered = Redelivered,
Exchange = Exchange,
RoutingKey = RoutingKey,
Queue = Queue,
Properties = Properties.ToBasicProperties(),
Body = Body != null
? Encoding.UTF8.GetBytes(Body.ToString(Formatting.None))
: RawBody
};
}
}
// IBasicProperties is finicky when it comes to writing it's properties,
// so we need this normalized class to read and write it from and to JSON
private class SerializableMessageProperties
{
public string AppId;
public string ClusterId;
public string ContentEncoding;
public string ContentType;
public string CorrelationId;
public byte? DeliveryMode;
public string Expiration;
public IDictionary<string, string> Headers;
public string MessageId;
public byte? Priority;
public string ReplyTo;
public long? Timestamp;
public string Type;
public string UserId;
public SerializableMessageProperties()
{
}
public SerializableMessageProperties(IBasicProperties fromProperties)
{
AppId = fromProperties.AppId;
ClusterId = fromProperties.ClusterId;
ContentEncoding = fromProperties.ContentEncoding;
ContentType = fromProperties.ContentType;
CorrelationId = fromProperties.CorrelationId;
DeliveryMode = fromProperties.IsDeliveryModePresent() ? (byte?)fromProperties.DeliveryMode : null;
Expiration = fromProperties.Expiration;
MessageId = fromProperties.MessageId;
Priority = fromProperties.IsPriorityPresent() ? (byte?) fromProperties.Priority : null;
ReplyTo = fromProperties.ReplyTo;
Timestamp = fromProperties.IsTimestampPresent() ? (long?)fromProperties.Timestamp.UnixTime : null;
Type = fromProperties.Type;
UserId = fromProperties.UserId;
if (fromProperties.IsHeadersPresent())
{
Headers = new Dictionary<string, string>();
// This assumes header values are UTF-8 encoded strings. This is true for Tapeti.
foreach (var pair in fromProperties.Headers)
Headers.Add(pair.Key, Encoding.UTF8.GetString((byte[])pair.Value));
}
else
Headers = null;
}
public IBasicProperties ToBasicProperties()
{
var properties = new BasicProperties();
if (!string.IsNullOrEmpty(AppId)) properties.AppId = AppId;
if (!string.IsNullOrEmpty(ClusterId)) properties.ClusterId = ClusterId;
if (!string.IsNullOrEmpty(ContentEncoding)) properties.ContentEncoding = ContentEncoding;
if (!string.IsNullOrEmpty(ContentType)) properties.ContentType = ContentType;
if (DeliveryMode.HasValue) properties.DeliveryMode = DeliveryMode.Value;
if (!string.IsNullOrEmpty(Expiration)) properties.Expiration = Expiration;
if (!string.IsNullOrEmpty(MessageId)) properties.MessageId = MessageId;
if (Priority.HasValue) properties.Priority = Priority.Value;
if (!string.IsNullOrEmpty(ReplyTo)) properties.ReplyTo = ReplyTo;
if (Timestamp.HasValue) properties.Timestamp = new AmqpTimestamp(Timestamp.Value);
if (!string.IsNullOrEmpty(Type)) properties.Type = Type;
if (!string.IsNullOrEmpty(UserId)) properties.UserId = UserId;
// ReSharper disable once InvertIf
if (Headers != null)
{
properties.Headers = new Dictionary<string, object>();
foreach (var pair in Headers)
properties.Headers.Add(pair.Key, Encoding.UTF8.GetBytes(pair.Value));
}
return properties;
}
}
// ReSharper restore FieldCanBeMadeReadOnly.Local
// ReSharper restore NotAccessedField.Local
// ReSharper restore MemberCanBePrivate.Local
}
}