When prototyping SisoDb I used DataTables
under the covers when consuming the SqlBulkCopy class to insert data. This lead to that I had the source entities in memory as well as the DataTables
. Since the SqlBulkCopy
class can work with readers I created a very simple DbDataReader
implementation over my entities instead. I gained a little performance boost and got rid of unnecessary creation of in-memory DataTables
. At first the API of a DataReader
seems kind of tedious to implement, but in the context of using it in conjunction with the SqlBulkCopy
-class, there's really not that many methods that needs to be implemented:
public override int GetOrdinal(string name)
public override IEnumerator GetEnumerator()
public override bool Read()
public override void Close()
public override bool HasRows
public override int FieldCount
public override int RecordsAffected
public override bool IsClosed
My model contains two entities but three sets. Entities: Structure
, StructureIndex
; Sets: Structures
, Indexes
, Uniques
. The root is the Structure
which then contains the other sets. For me the simplest solution was to create one reader for each entity. Each custom reader extended a custom base-class: SingleResultReaderBase<T>
.
To keep track of: tablename
, columnnames
, ordinals
etc.; I put together a very simpleStorageSchema
base class and then one specific implementation for each entity (Structure
, StructureIndex
).
That's pretty much it. I will show you the code for how I handled the root entity. The complete code for the other entities can be found in the repo of SisoDB
SingleResultReaderBase<T>
internal abstract class SingleResultReaderBase<T>
: DbDataReader where T : class
{
protected internal StorageSchemaBase StorageSchema { get; private set; }
protected IEnumerable<T> Items { get; private set; }
protected IEnumerator<T> Enumerator { get; private set; }
protected SingleResultReaderBase(
StorageSchemaBase storageSchema,
IEnumerable<T> items)
{
StorageSchema = storageSchema;
Items = items;
Enumerator = Items.GetEnumerator();
}
public override bool IsClosed
{
get { return Enumerator == null; }
}
public override int RecordsAffected
{
get { return Items.Count(); }
}
public override int FieldCount
{
get { return StorageSchema.FieldCount(); }
}
public override bool HasRows
{
get { return Items != null && Items.Count() > 0; }
}
public override void Close()
{
Enumerator = null;
Items = null;
}
public override bool Read()
{
return Enumerator.MoveNext();
}
public override IEnumerator GetEnumerator()
{
return Enumerator;
}
public override int GetOrdinal(string name)
{
return StorageSchema.FieldsByName[name].Index;
}
public override object this[int ordinal]
{
get { throw new NotSupportedException(); }
}
public override object this[string name]
{
get { throw new NotSupportedException(); }
}
public override int Depth
{
get { throw new NotSupportedException(); }
}
public override string GetName(int ordinal)
{
throw new NotSupportedException();
}
public override DataTable GetSchemaTable()
{
throw new NotSupportedException();
}
public override bool NextResult()
{
throw new NotSupportedException();
}
public override bool GetBoolean(int ordinal)
{
throw new NotSupportedException();
}
public override byte GetByte(int ordinal)
{
throw new NotSupportedException();
}
public override long GetBytes(int ordinal, long dataOffset, byte[] buffer, int bufferOffset, int length)
{
throw new NotSupportedException();
}
public override char GetChar(int ordinal)
{
throw new NotSupportedException();
}
public override long GetChars(
int ordinal,
long dataOffset,
char[] buffer,
int bufferOffset,
int length)
{
throw new NotSupportedException();
}
public override Guid GetGuid(int ordinal)
{
throw new NotSupportedException();
}
public override short GetInt16(int ordinal)
{
throw new NotSupportedException();
}
public override int GetInt32(int ordinal)
{
throw new NotSupportedException();
}
public override long GetInt64(int ordinal)
{
throw new NotSupportedException();
}
public override DateTime GetDateTime(int ordinal)
{
throw new NotSupportedException();
}
public override string GetString(int ordinal)
{
throw new NotSupportedException();
}
public override int GetValues(object[] values)
{
throw new NotSupportedException();
}
public override bool IsDBNull(int ordinal)
{
throw new NotSupportedException();
}
public override decimal GetDecimal(int ordinal)
{
throw new NotSupportedException();
}
public override double GetDouble(int ordinal)
{
throw new NotSupportedException();
}
public override float GetFloat(int ordinal)
{
throw new NotSupportedException();
}
public override string GetDataTypeName(int ordinal)
{
throw new NotSupportedException();
}
public override Type GetFieldType(int ordinal)
{
throw new NotSupportedException();
}
}
StructuresReader
internal class StructuresReader : SingleResultReaderBase<IStructure>
{
internal StructuresReader(
StructureStorageSchema storageSchema,
IEnumerable<IStructure> items) : base(storageSchema, items)
{
}
public override object GetValue(int ordinal)
{
var schemaField = StorageSchema.FieldsByIndex[ordinal];
if(schemaField.Name == StructureStorageSchema.Fields.Id.Name)
{
if (Enumerator.Current.Id.IdType == IdTypes.Identity)
return int.Parse(Enumerator.Current.Id.Value);
if (Enumerator.Current.Id.IdType == IdTypes.Guid)
return Guid.Parse(Enumerator.Current.Id.Value);
throw new NotSupportedException();
}
if(schemaField.Name == StructureStorageSchema.Fields.Json.Name)
return Enumerator.Current.Json;
throw new NotSupportedException();
}
}
StorageSchema
[Serializable]
internal abstract class StorageSchemaBase
{
internal readonly Dictionary<int, SchemaField> FieldsByIndex;
internal readonly Dictionary<string, SchemaField> FieldsByName;
internal string Name { get; private set; }
protected StorageSchemaBase(string name)
{
FieldsByIndex = new Dictionary<int, SchemaField>();
FieldsByName = new Dictionary<string, SchemaField>();
Name = name;
InitializeFields();
}
protected abstract void InitializeFields();
public int FieldCount()
{
return FieldsByIndex.Count;
}
}
[Serializable]
internal class StructureStorageSchema
: StorageSchemaBase
{
internal static class Fields
{
internal static readonly SchemaField Id = new SchemaField(0, "Id");
internal static readonly SchemaField Json = new SchemaField(1, "Json");
internal static SchemaField[] GetOrderedFields()
{
return new[] { Id, Json };
}
}
internal StructureStorageSchema(IStructureSchema structureSchema)
: base(structureSchema.GetStructureTableName())
{
}
protected override void InitializeFields()
{
foreach (var field in Fields.GetOrderedFields())
{
FieldsByIndex.Add(field.Index, field);
FieldsByName.Add(field.Name, field);
}
}
}
Then to consume the reader you would have code looking something like this:
using(var structuresReader = new StructuresReader(structureStorageSchema, structures))
{
using (var structuresBulkInserter = new SqlBulkCopy(
_connection,
SqlBulkCopyOptions.Default | SqlBulkCopyOptions.KeepIdentity,
_transatcion))
{
structuresBulkInserter.BatchSize = structuresReader
.RecordsAffected;
structuresBulkInserter.DestinationTableName = structuresReader
.StorageSchema.Name;
structuresBulkInserter.NotifyAfter = 0;
foreach (var field in structuresReader.StorageSchema.FieldsByIndex.Values)
structuresBulkInserter.ColumnMappings.Add(field.Name, field.Name);
structuresBulkInserter.WriteToServer(structuresReader);
structuresBulkInserter.Close();
}
}
That's it.
//Daniel