danielwertheim

danielwertheim


notes from a passionate developer

Share


Sections


Tags


Disclaimer

This is a personal blog. The opinions expressed here represent my own and not those of my employer, nor current or previous. All content is published "as is", without warranty of any kind and I don't take any responsibility and can't be liable for any claims, damages or other liabilities that might be caused by the content.

C# - Custom datareader for SqlBulkCopy

When prototyping SisoDb I used DataTables under the covers when consuming the SqlBulkCopy class to insert data. This lead to that I had the source entities in memory as well as the DataTables. Since the SqlBulkCopy class can work with readers I created a very simple DbDataReader implementation over my entities instead. I gained a little performance boost and got rid of unnecessary creation of in-memory DataTables. At first the API of a DataReader seems kind of tedious to implement, but in the context of using it in conjunction with the SqlBulkCopy-class, there's really not that many methods that needs to be implemented:

My model contains two entities but three sets. Entities: Structure, StructureIndex; Sets: Structures, Indexes, Uniques. The root is the Structure which then contains the other sets. For me the simplest solution was to create one reader for each entity. Each custom reader extended a custom base-class: SingleResultReaderBase<T>.

To keep track of: tablename, columnnames, ordinals etc.; I put together a very simpleStorageSchema base class and then one specific implementation for each entity (Structure, StructureIndex).

That's pretty much it. I will show you the code for how I handled the root entity. The complete code for the other entities can be found in the repo of SisoDB

SingleResultReaderBase<T>

internal abstract class SingleResultReaderBase<T>
    : DbDataReader where T : class 
{
    protected internal StorageSchemaBase StorageSchema { get; private set; }

    protected IEnumerable<T> Items { get; private set; }

    protected IEnumerator<T> Enumerator { get; private set; }

    protected SingleResultReaderBase(
        StorageSchemaBase storageSchema,
        IEnumerable<T> items)
    {
        StorageSchema = storageSchema;
        Items = items;
        Enumerator = Items.GetEnumerator();
    }

    public override bool IsClosed
    {
        get { return Enumerator == null; }
    }

    public override int RecordsAffected
    {
        get { return Items.Count(); }
    }

    public override int FieldCount
    {
        get { return StorageSchema.FieldCount(); }
    }

    public override bool HasRows
    {
        get { return Items != null && Items.Count() > 0; }
    }

    public override void Close()
    {
        Enumerator = null;
        Items = null;
    }
    
    public override bool Read()
    {
        return Enumerator.MoveNext();
    }

    public override IEnumerator GetEnumerator()
    {
        return Enumerator;
    }

    public override int GetOrdinal(string name)
    {
        return StorageSchema.FieldsByName[name].Index;
    }

    public override object this[int ordinal]
    {
        get { throw new NotSupportedException(); }
    }

    public override object this[string name]
    {
        get { throw new NotSupportedException(); }
    }

    public override int Depth
    {
        get { throw new NotSupportedException(); }
    }

    public override string GetName(int ordinal)
    {
        throw new NotSupportedException();
    }

    public override DataTable GetSchemaTable()
    {
        throw new NotSupportedException();
    }

    public override bool NextResult()
    {
        throw new NotSupportedException();
    }

    public override bool GetBoolean(int ordinal)
    {
        throw new NotSupportedException();
    }

    public override byte GetByte(int ordinal)
    {
        throw new NotSupportedException();
    }

    public override long GetBytes(int ordinal, long dataOffset, byte[] buffer, int bufferOffset, int length)
    {
        throw new NotSupportedException();
    }

    public override char GetChar(int ordinal)
    {
        throw new NotSupportedException();
    }

    public override long GetChars(
        int ordinal,
        long dataOffset,
        char[] buffer,
        int bufferOffset,
        int length)
    {
        throw new NotSupportedException();
    }

    public override Guid GetGuid(int ordinal)
    {
        throw new NotSupportedException();
    }

    public override short GetInt16(int ordinal)
    {
        throw new NotSupportedException();
    }

    public override int GetInt32(int ordinal)
    {
        throw new NotSupportedException();
    }

    public override long GetInt64(int ordinal)
    {
        throw new NotSupportedException();
    }

    public override DateTime GetDateTime(int ordinal)
    {
        throw new NotSupportedException();
    }

    public override string GetString(int ordinal)
    {
        throw new NotSupportedException();
    }

    public override int GetValues(object[] values)
    {
        throw new NotSupportedException();
    }

    public override bool IsDBNull(int ordinal)
    {
        throw new NotSupportedException();
    }

    public override decimal GetDecimal(int ordinal)
    {
        throw new NotSupportedException();
    }

    public override double GetDouble(int ordinal)
    {
        throw new NotSupportedException();
    }

    public override float GetFloat(int ordinal)
    {
        throw new NotSupportedException();
    }

    public override string GetDataTypeName(int ordinal)
    {
        throw new NotSupportedException();
    }

    public override Type GetFieldType(int ordinal)
    {
        throw new NotSupportedException();
    }
}

StructuresReader

internal class StructuresReader : SingleResultReaderBase<IStructure>
{
    internal StructuresReader(
        StructureStorageSchema storageSchema,
        IEnumerable<IStructure> items) : base(storageSchema, items)
    {
    }

    public override object GetValue(int ordinal)
    {
        var schemaField = StorageSchema.FieldsByIndex[ordinal];
        if(schemaField.Name == StructureStorageSchema.Fields.Id.Name)
        {
            if (Enumerator.Current.Id.IdType == IdTypes.Identity)
                return int.Parse(Enumerator.Current.Id.Value);

            if (Enumerator.Current.Id.IdType == IdTypes.Guid)
                return Guid.Parse(Enumerator.Current.Id.Value);

            throw new NotSupportedException();
        }

        if(schemaField.Name == StructureStorageSchema.Fields.Json.Name)
            return Enumerator.Current.Json;

        throw new NotSupportedException();
    }
}

StorageSchema

[Serializable]
internal abstract class StorageSchemaBase
{
    internal readonly Dictionary<int, SchemaField> FieldsByIndex;
    internal readonly Dictionary<string, SchemaField> FieldsByName;

    internal string Name { get; private set; }

    protected StorageSchemaBase(string name)
    {
        FieldsByIndex = new Dictionary<int, SchemaField>();
        FieldsByName = new Dictionary<string, SchemaField>();
        Name = name;
        
        InitializeFields();
    }

    protected abstract void InitializeFields();

    public int FieldCount()
    {
        return FieldsByIndex.Count;
    }
}

[Serializable]
internal class StructureStorageSchema
    : StorageSchemaBase
{
    internal static class Fields
    {
        internal static readonly SchemaField Id = new SchemaField(0, "Id");
        internal static readonly SchemaField Json = new SchemaField(1, "Json");

        internal static SchemaField[] GetOrderedFields()
        {
            return new[] { Id, Json };
        }
    }

    internal StructureStorageSchema(IStructureSchema structureSchema) 
        : base(structureSchema.GetStructureTableName())
    {
    }

    protected override void InitializeFields()
    {
        foreach (var field in Fields.GetOrderedFields())
        {
            FieldsByIndex.Add(field.Index, field);
            FieldsByName.Add(field.Name, field);
        }
    }
}

Then to consume the reader you would have code looking something like this:

using(var structuresReader = new StructuresReader(structureStorageSchema, structures))
{
    using (var structuresBulkInserter = new SqlBulkCopy(
        _connection,
        SqlBulkCopyOptions.Default | SqlBulkCopyOptions.KeepIdentity,
        _transatcion))
    {
        structuresBulkInserter.BatchSize = structuresReader
            .RecordsAffected;
        structuresBulkInserter.DestinationTableName = structuresReader
            .StorageSchema.Name;
        structuresBulkInserter.NotifyAfter = 0;

        foreach (var field in structuresReader.StorageSchema.FieldsByIndex.Values)
            structuresBulkInserter.ColumnMappings.Add(field.Name, field.Name);

        structuresBulkInserter.WriteToServer(structuresReader);
        structuresBulkInserter.Close();
    }
}

That's it.

//Daniel

View Comments